[jira] [Commented] (KAFKA-7837) maybeShrinkIsr may not reflect OfflinePartitions immediately

2019-01-17 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745832#comment-16745832
 ] 

Dong Lin commented on KAFKA-7837:
-

[~junrao] Currently `partition.maybeShrinkIsr(...)` reads 
`leaderReplicaIfLocal` to determine whether the partition is still the leader. 
When there is disk failure, we can also do `partition.leaderReplicaIfLocal = 
None` in `replicaManager.handleLogDirFailure(...)` for every partition on the 
offline disk, so that broker will no longer shrink ISR for these partitions. I 
personally feel this approach is probably simpler than accessing 
ReplicaManager.allPartitions().

> maybeShrinkIsr may not reflect OfflinePartitions immediately
> 
>
> Key: KAFKA-7837
> URL: https://issues.apache.org/jira/browse/KAFKA-7837
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> When a partition is marked offline due to a failed disk, the leader is 
> supposed to not shrink its ISR any more. In ReplicaManager.maybeShrinkIsr(), 
> we iterate through all non-offline partitions to shrink the ISR. If an ISR 
> needs to shrink, we need to write the new ISR to ZK, which can take a bit of 
> time. In this window, some partitions could now be marked as offline, but may 
> not be picked up by the iterator since it only reflects the state at that 
> point. This can cause all in-sync followers to be dropped out of ISR 
> unnecessarily and prevents a clean leader election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7836) The propagation of log dir failure can be delayed due to slowness in closing the file handles

2019-01-17 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745816#comment-16745816
 ] 

Dong Lin commented on KAFKA-7836:
-

[~junrao] This solution sounds good to me.

> The propagation of log dir failure can be delayed due to slowness in closing 
> the file handles
> -
>
> Key: KAFKA-7836
> URL: https://issues.apache.org/jira/browse/KAFKA-7836
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> In ReplicaManager.handleLogDirFailure(), we call 
> zkClient.propagateLogDirEvent after  logManager.handleLogDirFailure. The 
> latter closes the file handles of the offline replicas, which could take time 
> when the disk is bad. This will delay the new leader election by the 
> controller. In one incident, we have seen the closing of file handles of 
> multiple replicas taking more than 20 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2019-01-17 Thread Ivan Ponomarev (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Ponomarev updated KAFKA-5488:
--
Labels: pull-request-available  (was: needs-kip)

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Marcel "childNo͡.de" Trautwein
>Assignee: Ivan Ponomarev
>Priority: Major
>  Labels: pull-request-available
>
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:lang=java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:lang=java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
> https://gitlab.com/childno.de/apache_kafka/snippets/1665655



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-01-17 Thread Dmitry Minkovsky (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745716#comment-16745716
 ] 

Dmitry Minkovsky edited comment on KAFKA-5998 at 1/18/19 2:26 AM:
--

I'm getting this as well:

{{[2019-01-18 02:10:07,709] WARN 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager:295) task 
[2_1] Failed to write offset checkpoint file to 
/opt/streams-state/user-service/2_1/.checkpoint: java.io.FileNotFoundException: 
/opt/streams-state/user-service/2_1/.checkpoint.tmp (No such file or directory) 
}}
 {{ [2019-01-18 02:10:07,790] WARN 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager:295) task 
[2_1] Failed to write offset checkpoint file to 
/opt/streams-state/user-service/2_1/.checkpoint: java.io.FileNotFoundException: 
/opt/streams-state/user-service/2_1/.checkpoint.tmp (No such file or 
directory)}}

 

When I look in the state dir:

 
{quote}root@user-service-659fcb455-2fkxp:/opt/streams-state/user-service# ls 
-alh

total 48K

drwxr-xr-x 12 root root 4.0K Jan 18 01:50 .

drwxrwxrwx  3 root root 4.0K Jan 18 01:30 ..

drwxr-xr-x  4 root root 4.0K Jan 18 02:10 0_0

drwxr-xr-x  4 root root 4.0K Jan 18 01:30 0_1

drwxr-xr-x  4 root root 4.0K Jan 18 02:10 0_2

drwxr-xr-x  4 root root 4.0K Jan 18 01:30 0_3

drwxr-xr-x  4 root root 4.0K Jan 18 01:30 0_4

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_0

drwxr-xr-x  3 root root 4.0K Jan 18 02:10 1_1

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_2

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_3

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_4

root@user-service-659fcb455-2fkxp:/opt/streams-state/user-service#
{quote}
 

Not sure what that task 2_1. Will have to add some logging to check. 


was (Author: dminkovsky):
I'm getting this as well:

{{[2019-01-18 02:10:07,709] WARN 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager:295) task 
[2_1] Failed to write offset checkpoint file to 
/opt/streams-state/user-service/2_1/.checkpoint: java.io.FileNotFoundException: 
/opt/streams-state/user-service/2_1/.checkpoint.tmp (No such file or directory) 
}}
{{ [2019-01-18 02:10:07,790] WARN 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager:295) task 
[2_1] Failed to write offset checkpoint file to 
/opt/streams-state/user-service/2_1/.checkpoint: java.io.FileNotFoundException: 
/opt/streams-state/user-service/2_1/.checkpoint.tmp (No such file or 
directory)}}

 

When I look in the state dir:

 

root@user-service-659fcb455-2fkxp:/opt/streams-state/user-service# ls -alh

total 48K

drwxr-xr-x 12 root root 4.0K Jan 18 01:50 .

drwxrwxrwx  3 root root 4.0K Jan 18 01:30 ..

drwxr-xr-x  4 root root 4.0K Jan 18 02:10 0_0

drwxr-xr-x  4 root root 4.0K Jan 18 01:30 0_1

drwxr-xr-x  4 root root 4.0K Jan 18 02:10 0_2

drwxr-xr-x  4 root root 4.0K Jan 18 01:30 0_3

drwxr-xr-x  4 root root 4.0K Jan 18 01:30 0_4

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_0

drwxr-xr-x  3 root root 4.0K Jan 18 02:10 1_1

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_2

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_3

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_4

root@user-service-659fcb455-2fkxp:/opt/streams-state/user-service#

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> 

[jira] [Commented] (KAFKA-5998) /.checkpoint.tmp Not found exception

2019-01-17 Thread Dmitry Minkovsky (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745716#comment-16745716
 ] 

Dmitry Minkovsky commented on KAFKA-5998:
-

I'm getting this as well:

{{[2019-01-18 02:10:07,709] WARN 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager:295) task 
[2_1] Failed to write offset checkpoint file to 
/opt/streams-state/user-service/2_1/.checkpoint: java.io.FileNotFoundException: 
/opt/streams-state/user-service/2_1/.checkpoint.tmp (No such file or directory) 
}}
{{ [2019-01-18 02:10:07,790] WARN 
(org.apache.kafka.streams.processor.internals.ProcessorStateManager:295) task 
[2_1] Failed to write offset checkpoint file to 
/opt/streams-state/user-service/2_1/.checkpoint: java.io.FileNotFoundException: 
/opt/streams-state/user-service/2_1/.checkpoint.tmp (No such file or 
directory)}}

 

When I look in the state dir:

 

root@user-service-659fcb455-2fkxp:/opt/streams-state/user-service# ls -alh

total 48K

drwxr-xr-x 12 root root 4.0K Jan 18 01:50 .

drwxrwxrwx  3 root root 4.0K Jan 18 01:30 ..

drwxr-xr-x  4 root root 4.0K Jan 18 02:10 0_0

drwxr-xr-x  4 root root 4.0K Jan 18 01:30 0_1

drwxr-xr-x  4 root root 4.0K Jan 18 02:10 0_2

drwxr-xr-x  4 root root 4.0K Jan 18 01:30 0_3

drwxr-xr-x  4 root root 4.0K Jan 18 01:30 0_4

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_0

drwxr-xr-x  3 root root 4.0K Jan 18 02:10 1_1

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_2

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_3

drwxr-xr-x  3 root root 4.0K Jan 18 01:30 1_4

root@user-service-659fcb455-2fkxp:/opt/streams-state/user-service#

> /.checkpoint.tmp Not found exception
> 
>
> Key: KAFKA-5998
> URL: https://issues.apache.org/jira/browse/KAFKA-5998
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.0, 0.11.0.1
>Reporter: Yogesh BG
>Priority: Major
> Attachments: 5998.v1.txt, 5998.v2.txt, Topology.txt, exc.txt, 
> props.txt, streams.txt
>
>
> I have one kafka broker and one kafka stream running... I am running its 
> since two days under load of around 2500 msgs per second.. On third day am 
> getting below exception for some of the partitions, I have 16 partitions only 
> 0_0 and 0_1 gives this error
> {{09:43:25.955 [ks_0_inst-StreamThread-6] WARN  
> o.a.k.s.p.i.ProcessorStateManager - Failed to write checkpoint file to 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint:
> java.io.FileNotFoundException: 
> /data/kstreams/rtp-kafkastreams/0_1/.checkpoint.tmp (No such file or 
> directory)
> at java.io.FileOutputStream.open(Native Method) ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:221) 
> ~[na:1.7.0_111]
> at java.io.FileOutputStream.(FileOutputStream.java:171) 
> ~[na:1.7.0_111]
> at 
> org.apache.kafka.streams.state.internals.OffsetCheckpoint.write(OffsetCheckpoint.java:73)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.checkpoint(ProcessorStateManager.java:324)
>  ~[rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:267)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:260)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:254)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:322)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:415)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:314)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.commitAll(StreamThread.java:700)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:683)
>  [rtp-kafkastreams-1.0-SNAPSHOT-jar-with-dependencies.jar:na]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:523)
>  

[jira] [Assigned] (KAFKA-7838) improve logging in Partition.maybeShrinkIsr()

2019-01-17 Thread Dhruvil Shah (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7838?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-7838:
---

Assignee: Dhruvil Shah

> improve logging in Partition.maybeShrinkIsr()
> -
>
> Key: KAFKA-7838
> URL: https://issues.apache.org/jira/browse/KAFKA-7838
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Dhruvil Shah
>Priority: Major
>
> When we take a replica out of ISR, it would be useful to further log the 
> fetch offset of the out of sync replica and the leader's HW at the point. 
> This could be useful when the admin needs to manually enable unclean leader 
> election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6266) Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of __consumer_offsets-xx to log start offset 203569 since the checkpointed offset 120955 is inval

2019-01-17 Thread Zack Kobza (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745701#comment-16745701
 ] 

Zack Kobza commented on KAFKA-6266:
---

We recently encountered the same issue of a broker continuously logging:
{noformat}
Resetting first dirty offset of __consumer_offsets-33 to log start offset 
97771750116 since the checkpointed offset 97760913467 is invalid. 
(kafka.log.LogCleanerManager$)
{noformat}
And the broker's log cleaner stuck in a loop reprocessing __consumer_offsets-33 
without moving onto other topic-partitions.

I verified the contents of the {{cleaner-offset-checkpoint}} file, used by the 
log cleaner manager, and discovered the last cleaned offset in the file is 
different than what was reported:
{noformat}
$ grep "__consumer_offsets 33" /data/2/kafka/cleaner-offset-checkpoint
__consumer_offsets 33 97771750116 #>>> offset from log message 97760913467
{noformat}
I found a duplicate entry in another {{cleaner-offset-checkpoint}} file:
{noformat}
$ find /data -name cleaner-offset-checkpoint | xargs grep "__consumer_offsets 
33"
/data/2/kafka/cleaner-offset-checkpoint:__consumer_offsets 33 97771750116
/data/3/kafka/cleaner-offset-checkpoint:__consumer_offsets 33 97760913467
{noformat}
I deleted the duplicate entry in {{/data/3/kafka/cleaner-offset-checkpoint}}, 
because the __consumer_offsets-33 logs are located in {{/data/2/kafka}}, also 
updating the entries size value and restarted the broker. No WARNs were logged 
and the log cleaner successfully cleaned all consumer offsets partitions.

I do not know how the duplicate entry was created. We also upgraded from Kafka 
0.11.0.1 to 1.1.1 recently.

> Kafka 1.0.0 : Repeated occurrence of WARN Resetting first dirty offset of 
> __consumer_offsets-xx to log start offset 203569 since the checkpointed 
> offset 120955 is invalid. (kafka.log.LogCleanerManager$)
> --
>
> Key: KAFKA-6266
> URL: https://issues.apache.org/jira/browse/KAFKA-6266
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.0.0, 1.0.1
> Environment: CentOS 7, Apache kafka_2.12-1.0.0
>Reporter: VinayKumar
>Priority: Major
>
> I upgraded Kafka from 0.10.2.1 to 1.0.0 version. From then, I see the below 
> warnings in the log.
>  I'm seeing these continuously in the log, and want these to be fixed- so 
> that they wont repeat. Can someone please help me in fixing the below 
> warnings.
> {code}
> WARN Resetting first dirty offset of __consumer_offsets-17 to log start 
> offset 3346 since the checkpointed offset 3332 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-23 to log start 
> offset 4 since the checkpointed offset 1 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-19 to log start 
> offset 203569 since the checkpointed offset 120955 is invalid. 
> (kafka.log.LogCleanerManager$)
>  WARN Resetting first dirty offset of __consumer_offsets-35 to log start 
> offset 16957 since the checkpointed offset 7 is invalid. 
> (kafka.log.LogCleanerManager$)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7838) improve logging in Partition.maybeShrinkIsr()

2019-01-17 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7838:
--

 Summary: improve logging in Partition.maybeShrinkIsr()
 Key: KAFKA-7838
 URL: https://issues.apache.org/jira/browse/KAFKA-7838
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


When we take a replica out of ISR, it would be useful to further log the fetch 
offset of the out of sync replica and the leader's HW at the point. This could 
be useful when the admin needs to manually enable unclean leader election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7698) Kafka Broker fail to start: ProducerFencedException thrown from producerstatemanager.scala!checkProducerEpoch

2019-01-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745676#comment-16745676
 ] 

ASF GitHub Bot commented on KAFKA-7698:
---

mingaliu commented on pull request #5992: KAFKA-7698: Kafka Broker fail to 
start: ProducerFencedException throw…
URL: https://github.com/apache/kafka/pull/5992
 
 
   If ValidationType is None, also skip the check in appendEndTxnMarker 
(similar to append).
   
   Verified with existing unitest and our daily operation.
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Broker fail to start: ProducerFencedException thrown from 
> producerstatemanager.scala!checkProducerEpoch 
> --
>
> Key: KAFKA-7698
> URL: https://issues.apache.org/jira/browse/KAFKA-7698
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Ming Liu
>Priority: Major
>  Labels: easyfix
>
> During our operation of Kafka, we frequently saw this failure: 
>    There was an error in one of the threads during logs loading: 
> org.apache.kafka.common.errors.ProducerFencedException:
> {code:java}
> [06:57:09,697] INFO [ProducerStateManager partition=interaction_events-127] 
> Loading producer state from snapshot file 
> '/data/disk5/kafka/interaction_events-127/092130764817.snapshot' 
> (kafka.log.ProducerStateManager)
> [06:57:09,698] INFO [Log partition=interaction_events-127, 
> dir=/data/disk5/kafka] Completed load of log with 11 segments, log start 
> offset 91975003024 and log end offset 92130764817 in 12701 ms (kafka.log.Log)
> [06:57:09,701] ERROR There was an error in one of the threads during logs 
> loading: org.apache.kafka.common.errors.ProducerFencedException: Producer's 
> epoch is no longer valid. There is probably another producer with a newer 
> epoch. 63 (request epoch), 66 (server epoch) (kafka.log.LogManager)
> [06:57:09,705] INFO [ProducerStateManager 
> partition=client_interaction_events_authorid_enrichment-20] Writing producer 
> snapshot at offset 92418754384 (kafka.log.ProducerStateManager)
> [06:57:09,707] ERROR [KafkaServer id=2] Fatal error during KafkaServer 
> startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 63 
> (request epoch), 66 (server epoch)
> {code:java}
>  {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7837) maybeShrinkIsr may not reflect OfflinePartitions immediately

2019-01-17 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745680#comment-16745680
 ] 

Jun Rao commented on KAFKA-7837:


One way to improve this is that in maybeShrinkIsr(), for each iterated 
partition, we re-get the partition from allPartitions to check if it's offline 
at that time. [~lindong], do you think this is reasonable?

> maybeShrinkIsr may not reflect OfflinePartitions immediately
> 
>
> Key: KAFKA-7837
> URL: https://issues.apache.org/jira/browse/KAFKA-7837
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> When a partition is marked offline due to a failed disk, the leader is 
> supposed to not shrink its ISR any more. In ReplicaManager.maybeShrinkIsr(), 
> we iterate through all non-offline partitions to shrink the ISR. If an ISR 
> needs to shrink, we need to write the new ISR to ZK, which can take a bit of 
> time. In this window, some partitions could now be marked as offline, but may 
> not be picked up by the iterator since it only reflects the state at that 
> point. This can cause all in-sync followers to be dropped out of ISR 
> unnecessarily and prevents a clean leader election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7837) maybeShrinkIsr may not reflect OfflinePartitions immediately

2019-01-17 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7837:
--

 Summary: maybeShrinkIsr may not reflect OfflinePartitions 
immediately
 Key: KAFKA-7837
 URL: https://issues.apache.org/jira/browse/KAFKA-7837
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


When a partition is marked offline due to a failed disk, the leader is supposed 
to not shrink its ISR any more. In ReplicaManager.maybeShrinkIsr(), we iterate 
through all non-offline partitions to shrink the ISR. If an ISR needs to 
shrink, we need to write the new ISR to ZK, which can take a bit of time. In 
this window, some partitions could now be marked as offline, but may not be 
picked up by the iterator since it only reflects the state at that point. This 
can cause all in-sync followers to be dropped out of ISR unnecessarily and 
prevents a clean leader election.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7698) Kafka Broker fail to start: ProducerFencedException thrown from producerstatemanager.scala!checkProducerEpoch

2019-01-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745675#comment-16745675
 ] 

ASF GitHub Bot commented on KAFKA-7698:
---

mingaliu commented on pull request #5992: KAFKA-7698: Kafka Broker fail to 
start: ProducerFencedException throw…
URL: https://github.com/apache/kafka/pull/5992
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Broker fail to start: ProducerFencedException thrown from 
> producerstatemanager.scala!checkProducerEpoch 
> --
>
> Key: KAFKA-7698
> URL: https://issues.apache.org/jira/browse/KAFKA-7698
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Ming Liu
>Priority: Major
>  Labels: easyfix
>
> During our operation of Kafka, we frequently saw this failure: 
>    There was an error in one of the threads during logs loading: 
> org.apache.kafka.common.errors.ProducerFencedException:
> {code:java}
> [06:57:09,697] INFO [ProducerStateManager partition=interaction_events-127] 
> Loading producer state from snapshot file 
> '/data/disk5/kafka/interaction_events-127/092130764817.snapshot' 
> (kafka.log.ProducerStateManager)
> [06:57:09,698] INFO [Log partition=interaction_events-127, 
> dir=/data/disk5/kafka] Completed load of log with 11 segments, log start 
> offset 91975003024 and log end offset 92130764817 in 12701 ms (kafka.log.Log)
> [06:57:09,701] ERROR There was an error in one of the threads during logs 
> loading: org.apache.kafka.common.errors.ProducerFencedException: Producer's 
> epoch is no longer valid. There is probably another producer with a newer 
> epoch. 63 (request epoch), 66 (server epoch) (kafka.log.LogManager)
> [06:57:09,705] INFO [ProducerStateManager 
> partition=client_interaction_events_authorid_enrichment-20] Writing producer 
> snapshot at offset 92418754384 (kafka.log.ProducerStateManager)
> [06:57:09,707] ERROR [KafkaServer id=2] Fatal error during KafkaServer 
> startup. Prepare to shutdown (kafka.server.KafkaServer)
> org.apache.kafka.common.errors.ProducerFencedException: Producer's epoch is 
> no longer valid. There is probably another producer with a newer epoch. 63 
> (request epoch), 66 (server epoch)
> {code:java}
>  {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7836) The propagation of log dir failure can be delayed due to slowness in closing the file handles

2019-01-17 Thread Jun Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745672#comment-16745672
 ] 

Jun Rao commented on KAFKA-7836:


[~lindong], it seems that we could call zkClient.propagateLogDirEvent after the 
relevant partitions are marked offline, but before 
logManager.handleLogDirFailure, to speed up the propagation of log dir failure 
to the controller. Do you see any issue with that? Thanks.

> The propagation of log dir failure can be delayed due to slowness in closing 
> the file handles
> -
>
> Key: KAFKA-7836
> URL: https://issues.apache.org/jira/browse/KAFKA-7836
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Priority: Major
>
> In ReplicaManager.handleLogDirFailure(), we call 
> zkClient.propagateLogDirEvent after  logManager.handleLogDirFailure. The 
> latter closes the file handles of the offline replicas, which could take time 
> when the disk is bad. This will delay the new leader election by the 
> controller. In one incident, we have seen the closing of file handles of 
> multiple replicas taking more than 20 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7836) The propagation of log dir failure can be delayed due to slowness in closing the file handles

2019-01-17 Thread Jun Rao (JIRA)
Jun Rao created KAFKA-7836:
--

 Summary: The propagation of log dir failure can be delayed due to 
slowness in closing the file handles
 Key: KAFKA-7836
 URL: https://issues.apache.org/jira/browse/KAFKA-7836
 Project: Kafka
  Issue Type: Improvement
Reporter: Jun Rao


In ReplicaManager.handleLogDirFailure(), we call zkClient.propagateLogDirEvent 
after  logManager.handleLogDirFailure. The latter closes the file handles of 
the offline replicas, which could take time when the disk is bad. This will 
delay the new leader election by the controller. In one incident, we have seen 
the closing of file handles of multiple replicas taking more than 20 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2019-01-17 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-5488:
--

Assignee: Ivan Ponomarev

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Marcel "childNo͡.de" Trautwein
>Assignee: Ivan Ponomarev
>Priority: Major
>  Labels: needs-kip
>
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:lang=java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:lang=java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
> https://gitlab.com/childno.de/apache_kafka/snippets/1665655



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7829) Javadoc should show that AdminClient.alterReplicaLogDirs() is supported in Kafka 1.1.0 or later

2019-01-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745634#comment-16745634
 ] 

ASF GitHub Bot commented on KAFKA-7829:
---

junrao commented on pull request #6157: KAFKA-7829; Javadoc should show that 
AdminClient.alterReplicaLogDirs() is supported in Kafka 1.1.0 or later
URL: https://github.com/apache/kafka/pull/6157
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Javadoc should show that AdminClient.alterReplicaLogDirs() is supported in 
> Kafka 1.1.0 or later
> ---
>
> Key: KAFKA-7829
> URL: https://issues.apache.org/jira/browse/KAFKA-7829
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Dong Lin
>Priority: Major
>
> In ReassignPartitionsCommand, the --reassignment-json-file option says "...If 
> absolute log directory path is specified, it is currently required that the 
> replica has not already been created on that broker...". This is inaccurate 
> since we support moving existing replicas to new log dirs.
> In addition, the Javadoc of AdminClient.alterReplicaLogDirs(...) should show 
> the API is supported by brokers with version 1.1.0 or later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7829) Javadoc should show that AdminClient.alterReplicaLogDirs() is supported in Kafka 1.1.0 or later

2019-01-17 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7829?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7829.

   Resolution: Fixed
Fix Version/s: 2.2.0

Merged to trunk.

> Javadoc should show that AdminClient.alterReplicaLogDirs() is supported in 
> Kafka 1.1.0 or later
> ---
>
> Key: KAFKA-7829
> URL: https://issues.apache.org/jira/browse/KAFKA-7829
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Jun Rao
>Assignee: Dong Lin
>Priority: Major
> Fix For: 2.2.0
>
>
> In ReassignPartitionsCommand, the --reassignment-json-file option says "...If 
> absolute log directory path is specified, it is currently required that the 
> replica has not already been created on that broker...". This is inaccurate 
> since we support moving existing replicas to new log dirs.
> In addition, the Javadoc of AdminClient.alterReplicaLogDirs(...) should show 
> the API is supported by brokers with version 1.1.0 or later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7832) Use automatic RPC generation in CreateTopics

2019-01-17 Thread Dhruvil Shah (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745598#comment-16745598
 ] 

Dhruvil Shah commented on KAFKA-7832:
-

Putting the link here so the PR is easy to find: 
https://github.com/apache/kafka/pull/5972

> Use automatic RPC generation in CreateTopics
> 
>
> Key: KAFKA-7832
> URL: https://issues.apache.org/jira/browse/KAFKA-7832
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Major
>
> Use automatic RPC generation for the CreateTopics RPC.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6359) Work for KIP-236

2019-01-17 Thread Sriharsha Chintalapani (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani reassigned KAFKA-6359:
-

Assignee: GEORGE LI

> Work for KIP-236
> 
>
> Key: KAFKA-6359
> URL: https://issues.apache.org/jira/browse/KAFKA-6359
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: GEORGE LI
>Priority: Minor
>
> This issue is for the work described in KIP-236.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6359) Work for KIP-236

2019-01-17 Thread Sriharsha Chintalapani (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745566#comment-16745566
 ] 

Sriharsha Chintalapani commented on KAFKA-6359:
---

[~satish.duggana] sorry didn't notice your message. [~sql_consulting] from my 
team is already making progress sorry for the confusion here.

> Work for KIP-236
> 
>
> Key: KAFKA-6359
> URL: https://issues.apache.org/jira/browse/KAFKA-6359
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Assignee: GEORGE LI
>Priority: Minor
>
> This issue is for the work described in KIP-236.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6359) Work for KIP-236

2019-01-17 Thread Sriharsha Chintalapani (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6359?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sriharsha Chintalapani reassigned KAFKA-6359:
-

Assignee: (was: Tom Bentley)

> Work for KIP-236
> 
>
> Key: KAFKA-6359
> URL: https://issues.apache.org/jira/browse/KAFKA-6359
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Tom Bentley
>Priority: Minor
>
> This issue is for the work described in KIP-236.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7815) SourceTask should expose ACK'd offsets, metadata

2019-01-17 Thread Ryanne Dolan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745471#comment-16745471
 ] 

Ryanne Dolan commented on KAFKA-7815:
-

[~cricket007] given the KIP freeze for 2.2.0 in the near future, I'd rather not 
expand the scope of this KIP, but I agree that something like that KIP worthy :)

> SourceTask should expose ACK'd offsets, metadata
> 
>
> Key: KAFKA-7815
> URL: https://issues.apache.org/jira/browse/KAFKA-7815
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
> Fix For: 2.2.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Add a new callback method, recordLogged(), to notify SourceTasks when a 
> record is ACK'd by the downstream broker. Include offsets and metadata of 
> ACK'd record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2019-01-17 Thread Dmitry Minkovsky (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745417#comment-16745417
 ] 

Dmitry Minkovsky edited comment on KAFKA-6817 at 1/17/19 7:37 PM:
--

[~mjsax] Is this exception co-occurring with 
https://issues.apache.org/jira/browse/KAFKA-7190? Does it only occur in low 
traffic? I am getting the warnings in KAFKA-7190 in all my applications (they 
are low traffic) but the exception/streams task shutdown that's reported here 
only in one so far. Wondering if better to run without EOS? I've not been able 
to reproduce the cause of this issue and the bug leaves my application in 
broken state that requires reset.


was (Author: dminkovsky):
[~mjsax] Is this exception co-occurring with 
https://issues.apache.org/jira/browse/KAFKA-7190? Does it only occur in low 
traffic? I am getting the warnings in KAFKA-7190 and the exception/streams task 
shutdown that's reported here. Wondering if better to run without EOS? I've not 
been able to reproduce the cause of this issue and the bug leaves my 
application in broken state that requires reset.

> UnknownProducerIdException when writing messages with old timestamps
> 
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Odin Standal
>Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 22 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error 
> occurs after 10 minutes of producing messages that have old timestamps (type 
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so 
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code 
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers 
> from memory 

[jira] [Comment Edited] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2019-01-17 Thread Dmitry Minkovsky (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745417#comment-16745417
 ] 

Dmitry Minkovsky edited comment on KAFKA-6817 at 1/17/19 7:35 PM:
--

[~mjsax] Is this exception co-occurring with 
https://issues.apache.org/jira/browse/KAFKA-7190? Does it only occur in low 
traffic? I am getting the warnings in KAFKA-7190 and the exception/streams task 
shutdown that's reported here. Wondering if better to run without EOS? I've not 
been able to reproduce the cause of this issue and the bug leaves my 
application in broken state that requires reset.


was (Author: dminkovsky):
[~mjsax] would this exception be co-occurring with 
https://issues.apache.org/jira/browse/KAFKA-7190? Does this only occur in low 
traffic? I am getting both the warnings in KAFKA-7190, but also the 
exception/streams task shutdown that's reported here. Wondering if better to 
run without EOS? This bug leaves my application in broken state that requires 
reset.

> UnknownProducerIdException when writing messages with old timestamps
> 
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Odin Standal
>Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 22 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error 
> occurs after 10 minutes of producing messages that have old timestamps (type 
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so 
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code 
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers 
> from memory erroneously when processing records which are older than the 
> maxProducerIdExpirationMs (coming from the 

[jira] [Commented] (KAFKA-6817) UnknownProducerIdException when writing messages with old timestamps

2019-01-17 Thread Dmitry Minkovsky (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6817?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745417#comment-16745417
 ] 

Dmitry Minkovsky commented on KAFKA-6817:
-

[~mjsax] would this exception be co-occurring with 
https://issues.apache.org/jira/browse/KAFKA-7190? Does this only occur in low 
traffic? I am getting both the warnings in KAFKA-7190, but also the 
exception/streams task shutdown that's reported here. Wondering if better to 
run without EOS? This bug leaves my application in broken state that requires 
reset.

> UnknownProducerIdException when writing messages with old timestamps
> 
>
> Key: KAFKA-6817
> URL: https://issues.apache.org/jira/browse/KAFKA-6817
> Project: Kafka
>  Issue Type: Bug
>  Components: producer 
>Affects Versions: 1.1.0
>Reporter: Odin Standal
>Priority: Major
>
> We are seeing the following exception in our Kafka application: 
> {code:java}
> ERROR o.a.k.s.p.internals.StreamTask - task [0_0] Failed to close producer 
> due to the following error: org.apache.kafka.streams.errors.StreamsException: 
> task [0_0] Abort sending since an error caught with a previous record (key 
> 22 value some-value timestamp 1519200902670) to topic 
> exactly-once-test-topic- v2 due to This exception is raised by the broker if 
> it could not locate the producer metadata associated with the producerId in 
> question. This could happen if, for instance, the producer's records were 
> deleted because their retention time had elapsed. Once the last records of 
> the producerId are removed, the producer's metadata is removed from the 
> broker, and future appends by the producer will return this exception. at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.recordSendError(RecordCollectorImpl.java:125)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.access$500(RecordCollectorImpl.java:48)
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:180)
>  at 
> org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:1199)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
>  at 
> org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) 
> at 
> org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481)
>  at 
> org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:74) 
> at 
> org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:692)
>  at 
> org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) 
> at 
> org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482)
>  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) at 
> java.lang.Thread.run(Thread.java:748) Caused by: 
> org.apache.kafka.common.errors.UnknownProducerIdException
> {code}
> We discovered this error when we had the need to reprocess old messages. See 
> more details on 
> [Stackoverflow|https://stackoverflow.com/questions/49872827/unknownproduceridexception-in-kafka-streams-when-enabling-exactly-once?noredirect=1#comment86901917_49872827]
> We have reproduced the error with a smaller example application. The error 
> occurs after 10 minutes of producing messages that have old timestamps (type 
> 1 year old). The topic we are writing to has a retention.ms set to 1 year so 
> we are expecting the messages to stay there.
> After digging through the ProducerStateManager-code in the Kafka source code 
> we have a theory of what might be wrong.
> The ProducerStateManager.removeExpiredProducers() seems to remove producers 
> from memory erroneously when processing records which are older than the 
> maxProducerIdExpirationMs (coming from the `transactional.id.expiration.ms` 
> configuration), which is set by default to 7 days. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7833) StreamsBuilder should throw an exception if addGlobalStore and addStateStore is called for the same store builder

2019-01-17 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7833?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745364#comment-16745364
 ] 

Matthias J. Sax commented on KAFKA-7833:


{quote}The problem is with the store name: same store names are not expected.
{quote}
That is the exact problem. The store name is returned by the StoreBuilder, and 
thus adding the same StoreBuilder twice, results is a naming conflict that is 
not detected atm.

> StreamsBuilder should throw an exception if addGlobalStore and addStateStore 
> is called for the same store builder
> -
>
> Key: KAFKA-7833
> URL: https://issues.apache.org/jira/browse/KAFKA-7833
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Sam Lendle
>Priority: Major
>
> {{StoreBuilder> storeBuilder =}}
> {{ Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("global-store"), 
> null, null);}}
> {{ builder.addGlobalStore(}}
> {{ storeBuilder,}}
> {{ "global-topic",}}
> {{ Consumed.with(null, null),}}
> {{ new KTableSource(storeBuilder.name())}}
> {{ );}}
> {{builder.addStateStore(storeBuilder); }}
> {{builder.build();}}
>  
>  
> Does not throw an exception.
>  
> cc [~mjsax]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7738) Track partition leader epochs in client metadata

2019-01-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745355#comment-16745355
 ] 

ASF GitHub Bot commented on KAFKA-7738:
---

hachikuji commented on pull request #6045: KAFKA-7738 Track leader epochs in 
Metadata
URL: https://github.com/apache/kafka/pull/6045
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Track partition leader epochs in client metadata
> 
>
> Key: KAFKA-7738
> URL: https://issues.apache.org/jira/browse/KAFKA-7738
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Minor
>
> The Metadata API now exposes the current leader epoch of each partition. We 
> can leverage this information to be smarter in how we fetch metadata. For 
> example, when we receive a NOT_LEADER_FOR_PARTITION, we know to look for an 
> epoch bump before resuming whatever operation we were trying to do. 
> Additionally, this gives us a way to detect stale metadata.
> This requires a little more sophistication in how we track metadata in the 
> client. For example, we may be not be able to assume metadata updates are 
> monotonic. In other words, they may mix stale metadata for one partition and 
> fresh metadata for another. I am not sure we have any strong guarantees on 
> the order in which metadata updates are seen on each broker.
> It may be helpful in this context to control the topics that we fetch 
> metadata for at a finer granularity. Potentially we could even extend the 
> Metadata API to specify a subset of the partitions that the client is 
> interested in.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7738) Track partition leader epochs in client metadata

2019-01-17 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-7738.

   Resolution: Fixed
Fix Version/s: 2.2.0

> Track partition leader epochs in client metadata
> 
>
> Key: KAFKA-7738
> URL: https://issues.apache.org/jira/browse/KAFKA-7738
> Project: Kafka
>  Issue Type: Improvement
>Reporter: David Arthur
>Assignee: David Arthur
>Priority: Minor
> Fix For: 2.2.0
>
>
> The Metadata API now exposes the current leader epoch of each partition. We 
> can leverage this information to be smarter in how we fetch metadata. For 
> example, when we receive a NOT_LEADER_FOR_PARTITION, we know to look for an 
> epoch bump before resuming whatever operation we were trying to do. 
> Additionally, this gives us a way to detect stale metadata.
> This requires a little more sophistication in how we track metadata in the 
> client. For example, we may be not be able to assume metadata updates are 
> monotonic. In other words, they may mix stale metadata for one partition and 
> fresh metadata for another. I am not sure we have any strong guarantees on 
> the order in which metadata updates are seen on each broker.
> It may be helpful in this context to control the topics that we fetch 
> metadata for at a finer granularity. Potentially we could even extend the 
> Metadata API to specify a subset of the partitions that the client is 
> interested in.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7815) SourceTask should expose ACK'd offsets, metadata

2019-01-17 Thread Jordan Moore (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745281#comment-16745281
 ] 

Jordan Moore commented on KAFKA-7815:
-

So, I was thinking about putting something like "recordLogged" as well into a 
SinkTasks - a callback that the record reached the sink. 

I understand that might need a new KIP, but I feel like the scope might be able 
to be expanded to include that?

One use case - HDFS / S3 Connector, when a file is written, I want a callback 
to register a Hive Metastore partition. 

> SourceTask should expose ACK'd offsets, metadata
> 
>
> Key: KAFKA-7815
> URL: https://issues.apache.org/jira/browse/KAFKA-7815
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
> Fix For: 2.2.0
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Add a new callback method, recordLogged(), to notify SourceTasks when a 
> record is ACK'd by the downstream broker. Include offsets and metadata of 
> ACK'd record.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7835) Kafka Connect - Unable to create multiple connectors in a single HTTP (REST) call

2019-01-17 Thread Suraj Fale (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suraj Fale updated KAFKA-7835:
--
Description: 
We have 6 connectors under distributed Kafka worker. Each of these connectors 
has 18-36 tasks. Each time we add new connector all existing connectors 
restarts/re-balance. 

*Issue:*
 * For each new connector we add, previous/existing connector(s) restarts along 
with new connector.
 * All existing connectors and their tasks goes into re-balance mode.

Assuming that we want to add 3 new connectors in one flow or call, how can we 
do that in one HTTP call rather than 3 HTTP POST calls. Because after each HTTP 
call we have to wait for re-balance to finish and wait for all tasks to be up 
again across all instances. 

Is there any way to create all these 3 connectors for same distributed worker 
in one HTTP call?

*If not, can we add this as a improvement?*

Something like
{code:java}
##POST http://localhost:8083/connectors
[
  {
 "name": "connector-1",
 "config": {
  "connector.class": "Connector1Class",
  "tasks.max": "36",
  "run.next.start": "5",
  "fetchsize": "10",
  "topic": "message.connector-1",
  "offset.start": "0"
 }
  },
  {
  "name": "connector-2",
  "config": {
   "connector.class": "Connector2Class",
   "tasks.max": "18",
   "run.next.start": "5",
   "fetchsize": "10",
   "topic": "message.connector-2",
   "offset.start": "0"
  }
  },
  {
  "name": "connector-3",
  "config": {
   "connector.class": "Connector3Class",
   "tasks.max": "18",
   "run.next.start": "5",
   "fetchsize": "10",
   "topic": "message.connector-3",
   "offset.start": "0"
  }
  }
]
{code}

  was:
We have 6 connectors under distributed Kafka worker. Each of these connectors 
has 18-36 tasks. Each time we add new connector all existing connectors 
restarts/re-balance. 

*Issue:*
 * For each new connector we add, previous/existing connector(s) restarts along 
with new connector.
 * All existing connectors and their tasks goes into re-balance mode.

Assuming that we want to add 3 new connectors in one flow or call, how can we 
do that in one HTTP call rather than 3 HTTP POST calls. Because after each HTTP 
call we have to wait for re-balance to finish and wait for all tasks to be up 
again across all instances. 

Is there any way to create all these 3 connectors for same distributed worker 
in one HTTP call?

*If not, can we add this as a improvement?*

Something like
{code:java}
## http://localhost:8083/connectors
[
  {
 "name": "connector-1",
 "config": {
  "connector.class": "Connector1Class",
  "tasks.max": "36",
  "run.next.start": "5",
  "fetchsize": "10",
  "topic": "message.connector-1",
  "offset.start": "0"
 }
  },
  {
  "name": "connector-2",
  "config": {
   "connector.class": "Connector2Class",
   "tasks.max": "18",
   "run.next.start": "5",
   "fetchsize": "10",
   "topic": "message.connector-2",
   "offset.start": "0"
  }
  },
  {
  "name": "connector-3",
  "config": {
   "connector.class": "Connector3Class",
   "tasks.max": "18",
   "run.next.start": "5",
   "fetchsize": "10",
   "topic": "message.connector-3",
   "offset.start": "0"
  }
  }
]
{code}


> Kafka Connect - Unable to create multiple connectors in a single HTTP (REST) 
> call
> -
>
> Key: KAFKA-7835
> URL: https://issues.apache.org/jira/browse/KAFKA-7835
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.1.1
>Reporter: Suraj Fale
>Priority: Major
>  Labels: kafka-connect
>
> We have 6 connectors under distributed Kafka worker. Each of these connectors 
> has 18-36 tasks. Each time we add new connector all existing connectors 
> restarts/re-balance. 
> *Issue:*
>  * For each new connector we add, previous/existing connector(s) restarts 
> along with new connector.
>  * All existing connectors and their tasks goes into re-balance mode.
> Assuming that we want to add 3 new connectors in one flow or call, how can we 
> do that in one HTTP call rather than 3 HTTP POST calls. Because after each 
> HTTP call we have to wait for re-balance to finish and wait for all tasks to 
> be up again across all instances. 
> Is there any way to create all these 3 connectors for same distributed worker 
> in one HTTP call?
> *If not, can we add this as a improvement?*
> Something like
> {code:java}
> ##POST http://localhost:8083/connectors
> [
>   {
>  "name": 

[jira] [Updated] (KAFKA-7835) Kafka Connect - Unable to create multiple connectors in a single HTTP (REST) call

2019-01-17 Thread Suraj Fale (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suraj Fale updated KAFKA-7835:
--
Description: 
We have 6 connectors under distributed Kafka worker. Each of these connectors 
has 18-36 tasks. Each time we add new connector all existing connectors 
restarts/re-balance. 

*Issue:*
 * For each new connector we add, previous/existing connector(s) restarts along 
with new connector.
 * All existing connectors and their tasks goes into re-balance mode.

Assuming that we want to add 3 new connectors in one flow or call, how can we 
do that in one HTTP call rather than 3 HTTP POST calls. Because after each HTTP 
call we have to wait for re-balance to finish and wait for all tasks to be up 
again across all instances. 

Is there any way to create all these 3 connectors for same distributed worker 
in one HTTP call?

*If not, can we add this as a improvement?*

Something like
{code:java}
## http://localhost:8083/connectors
[
  {
 "name": "connector-1",
 "config": {
  "connector.class": "Connector1Class",
  "tasks.max": "36",
  "run.next.start": "5",
  "fetchsize": "10",
  "topic": "message.connector-1",
  "offset.start": "0"
 }
  },
  {
  "name": "connector-2",
  "config": {
   "connector.class": "Connector2Class",
   "tasks.max": "18",
   "run.next.start": "5",
   "fetchsize": "10",
   "topic": "message.connector-2",
   "offset.start": "0"
  }
  },
  {
  "name": "connector-3",
  "config": {
   "connector.class": "Connector3Class",
   "tasks.max": "18",
   "run.next.start": "5",
   "fetchsize": "10",
   "topic": "message.connector-3",
   "offset.start": "0"
  }
  }
]
{code}

  was:
We have 6 connectors under distributed Kafka worker. Each of these connectors 
has 18-36 tasks. Each time we add new connector all existing connectors 
restarts/re-balance. 

*Issue:*
 * For each new connector we add, previous/existing connector(s) restarts along 
with new connector.
 * All existing connectors and their tasks goes into re-balance mode.

Assuming that we want to add 3 new connectors in one flow or call, how can we 
do that in one HTTP call rather than 3 HTTP POST calls. Because after each HTTP 
call we have to wait for re-balance to finish and wait for all tasks to be up 
again across all instances. 

Is there any way to create all these 3 connectors for same distributed worker 
in one HTTP call?

*If not, can we add this as a improvement?*

Something like
{code:java}
[
 {
 "name": "connector-1",
"config": {
"connector.class": "Connector1Class",
"tasks.max": "36",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-1",
"offset.start": "0"
}
},
{
"name": "connector-2",
"config": {
"connector.class": "Connector2Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-2",
"offset.start": "0"
}
},
{
"name": "connector-3",
"config": {
"connector.class": "Connector3Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-3",
"offset.start": "0"
}
}
]
{code}


> Kafka Connect - Unable to create multiple connectors in a single HTTP (REST) 
> call
> -
>
> Key: KAFKA-7835
> URL: https://issues.apache.org/jira/browse/KAFKA-7835
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.1.1
>Reporter: Suraj Fale
>Priority: Major
>  Labels: kafka-connect
>
> We have 6 connectors under distributed Kafka worker. Each of these connectors 
> has 18-36 tasks. Each time we add new connector all existing connectors 
> restarts/re-balance. 
> *Issue:*
>  * For each new connector we add, previous/existing connector(s) restarts 
> along with new connector.
>  * All existing connectors and their tasks goes into re-balance mode.
> Assuming that we want to add 3 new connectors in one flow or call, how can we 
> do that in one HTTP call rather than 3 HTTP POST calls. Because after each 
> HTTP call we have to wait for re-balance to finish and wait for all tasks to 
> be up again across all instances. 
> Is there any way to create all these 3 connectors for same distributed worker 
> in one HTTP call?
> *If not, can we add this as a improvement?*
> Something like
> {code:java}
> ## http://localhost:8083/connectors
> [
>   {
>  "name": "connector-1",
>  "config": {
>   "connector.class": "Connector1Class",
>   "tasks.max": "36",
>   "run.next.start": "5",
>   "fetchsize": "10",
>   "topic": "message.connector-1",
>   "offset.start": "0"
>  }
>   },
>   {
>   "name": 

[jira] [Updated] (KAFKA-7835) Kafka Connect - Unable to create multiple connectors in a single HTTP (REST) call

2019-01-17 Thread Suraj Fale (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suraj Fale updated KAFKA-7835:
--
Description: 
We have 6 connectors under distributed Kafka worker. Each of these connectors 
has 18-36 tasks. Each time we add new connector all existing connectors 
restarts/re-balance. 

*Issue:*
 * For each new connector we add, previous/existing connector(s) restarts along 
with new connector.
 * All existing connectors and their tasks goes into re-balance mode.

Assuming that we want to add 3 new connectors in one flow or call, how can we 
do that in one HTTP call rather than 3 HTTP POST calls. Because after each HTTP 
call we have to wait for re-balance to finish and wait for all tasks to be up 
again across all instances. 

Is there any way to create all these 3 connectors for same distributed worker 
in one HTTP call?

*If not, can we add this as a improvement?*

Something like
{code:java}
[
 {
 "name": "connector-1",
"config": {
"connector.class": "Connector1Class",
"tasks.max": "36",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-1",
"offset.start": "0"
}
},
{
"name": "connector-2",
"config": {
"connector.class": "Connector2Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-2",
"offset.start": "0"
}
},
{
"name": "connector-3",
"config": {
"connector.class": "Connector3Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-3",
"offset.start": "0"
}
}
]
{code}

  was:
We have 6 connectors under distributed Kafka worker. Each of these connectors 
has 18-36 tasks. Each time we add new connector all existing connectors 
restarts/re-balance. 

*Issue:*
 * For each new connector we add, previous/existing connector(s) restarts along 
with new connector.
 * All existing connectors and their tasks goes into re-balance mode.

Assuming that we want to add 3 new connectors in one flow or call, how can we 
do that in one HTTP call rather than 3 HTTP POST calls. Because after each HTTP 
call we have to wait for re-balance to finish and wait for all tasks to be up 
again across all instances. 

Is there any way to create all these 3 connectors for same distributed worker 
in one HTTP call?

*If not, can we add this as a improvement?*

Something like
{code:java}
## http://localhost:8083/connectors
[
{
"name": "connector-1",
"config": {
"connector.class": "Connector1Class",
"tasks.max": "36",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-1",
"offset.start": "0"
}
},
{
"name": "connector-2",
"config": {
"connector.class": "Connector2Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-2",
"offset.start": "0"
}
},
{
"name": "connector-3",
"config": {
"connector.class": "Connector3Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-3",
"offset.start": "0"
}
}
]
{code}


> Kafka Connect - Unable to create multiple connectors in a single HTTP (REST) 
> call
> -
>
> Key: KAFKA-7835
> URL: https://issues.apache.org/jira/browse/KAFKA-7835
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.1.1
>Reporter: Suraj Fale
>Priority: Major
>  Labels: kafka-connect
>
> We have 6 connectors under distributed Kafka worker. Each of these connectors 
> has 18-36 tasks. Each time we add new connector all existing connectors 
> restarts/re-balance. 
> *Issue:*
>  * For each new connector we add, previous/existing connector(s) restarts 
> along with new connector.
>  * All existing connectors and their tasks goes into re-balance mode.
> Assuming that we want to add 3 new connectors in one flow or call, how can we 
> do that in one HTTP call rather than 3 HTTP POST calls. Because after each 
> HTTP call we have to wait for re-balance to finish and wait for all tasks to 
> be up again across all instances. 
> Is there any way to create all these 3 connectors for same distributed worker 
> in one HTTP call?
> *If not, can we add this as a improvement?*
> Something like
> {code:java}
> [
>  {
>  "name": "connector-1",
> "config": {
> "connector.class": "Connector1Class",
> "tasks.max": "36",
> "run.next.start": "5",
> "fetchsize": "10",
> "topic": "message.connector-1",
> "offset.start": "0"
> }
> },
> {
> "name": "connector-2",
> "config": {
> "connector.class": "Connector2Class",
> "tasks.max": "18",
> "run.next.start": "5",
> "fetchsize": "10",
> "topic": "message.connector-2",
> "offset.start": "0"
> }
> },
> {
> "name": "connector-3",
> "config": {
> "connector.class": "Connector3Class",
> "tasks.max": "18",
> "run.next.start": "5",
> "fetchsize": "10",
> "topic": 

[jira] [Updated] (KAFKA-7835) Kafka Connect - Unable to create multiple connectors in a single HTTP (REST) call

2019-01-17 Thread Suraj Fale (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Suraj Fale updated KAFKA-7835:
--
Description: 
We have 6 connectors under distributed Kafka worker. Each of these connectors 
has 18-36 tasks. Each time we add new connector all existing connectors 
restarts/re-balance. 

*Issue:*
 * For each new connector we add, previous/existing connector(s) restarts along 
with new connector.
 * All existing connectors and their tasks goes into re-balance mode.

Assuming that we want to add 3 new connectors in one flow or call, how can we 
do that in one HTTP call rather than 3 HTTP POST calls. Because after each HTTP 
call we have to wait for re-balance to finish and wait for all tasks to be up 
again across all instances. 

Is there any way to create all these 3 connectors for same distributed worker 
in one HTTP call?

*If not, can we add this as a improvement?*

Something like
{code:java}
## http://localhost:8083/connectors
[
{
"name": "connector-1",
"config": {
"connector.class": "Connector1Class",
"tasks.max": "36",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-1",
"offset.start": "0"
}
},
{
"name": "connector-2",
"config": {
"connector.class": "Connector2Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-2",
"offset.start": "0"
}
},
{
"name": "connector-3",
"config": {
"connector.class": "Connector3Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-3",
"offset.start": "0"
}
}
]
{code}

  was:
We have 6 connectors under distributed Kafka worker. Each of this connectors 
has 18-36 tasks. Each time we add new connector all existing connector 
restarts/re-balance. 

*Issue:*
 * For each new connector we add, previous/existing connector(s) restarts along 
with new connector.
 * All existing connectors and their tasks goes into re-balance mode.

Assuming that we want to add 3 new connectors in one flow, right now we have to 
make 3 Http call. And thus it existing all connectors restarts 3 times.

Is there any way to create all these 3 connectors for same distributed worker 
in one HTTP call?

 *If not can we add this as a improvement?*

Something like
{code:java}
## http://localhost:8083/connectors
[
{
"name": "connector-1",
"config": {
"connector.class": "Connector1Class",
"tasks.max": "36",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-1",
"offset.start": "0"
}
},
{
"name": "connector-2",
"config": {
"connector.class": "Connector2Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-2",
"offset.start": "0"
}
},
{
"name": "connector-3",
"config": {
"connector.class": "Connector3Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-3",
"offset.start": "0"
}
}
]
{code}


> Kafka Connect - Unable to create multiple connectors in a single HTTP (REST) 
> call
> -
>
> Key: KAFKA-7835
> URL: https://issues.apache.org/jira/browse/KAFKA-7835
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 1.1.1
>Reporter: Suraj Fale
>Priority: Major
>  Labels: kafka-connect
>
> We have 6 connectors under distributed Kafka worker. Each of these connectors 
> has 18-36 tasks. Each time we add new connector all existing connectors 
> restarts/re-balance. 
> *Issue:*
>  * For each new connector we add, previous/existing connector(s) restarts 
> along with new connector.
>  * All existing connectors and their tasks goes into re-balance mode.
> Assuming that we want to add 3 new connectors in one flow or call, how can we 
> do that in one HTTP call rather than 3 HTTP POST calls. Because after each 
> HTTP call we have to wait for re-balance to finish and wait for all tasks to 
> be up again across all instances. 
> Is there any way to create all these 3 connectors for same distributed worker 
> in one HTTP call?
> *If not, can we add this as a improvement?*
> Something like
> {code:java}
> ## http://localhost:8083/connectors
> [
> {
> "name": "connector-1",
> "config": {
> "connector.class": "Connector1Class",
> "tasks.max": "36",
> "run.next.start": "5",
> "fetchsize": "10",
> "topic": "message.connector-1",
> "offset.start": "0"
> }
> },
> {
> "name": "connector-2",
> "config": {
> "connector.class": "Connector2Class",
> "tasks.max": "18",
> "run.next.start": "5",
> "fetchsize": "10",
> "topic": "message.connector-2",
> "offset.start": "0"
> }
> },
> {
> "name": "connector-3",
> "config": {
> "connector.class": "Connector3Class",
> "tasks.max": "18",
> "run.next.start": "5",
> "fetchsize": "10",
> "topic": "message.connector-3",
> "offset.start": "0"
> }
> }
> 

[jira] [Created] (KAFKA-7835) Kafka Connect - Unable to create multiple connectors in a single HTTP (REST) call

2019-01-17 Thread Suraj Fale (JIRA)
Suraj Fale created KAFKA-7835:
-

 Summary: Kafka Connect - Unable to create multiple connectors in a 
single HTTP (REST) call
 Key: KAFKA-7835
 URL: https://issues.apache.org/jira/browse/KAFKA-7835
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 1.1.1
Reporter: Suraj Fale


We have 6 connectors under distributed Kafka worker. Each of this connectors 
has 18-36 tasks. Each time we add new connector all existing connector 
restarts/re-balance. 

*Issue:*
 * For each new connector we add, previous/existing connector(s) restarts along 
with new connector.
 * All existing connectors and their tasks goes into re-balance mode.

Assuming that we want to add 3 new connectors in one flow, right now we have to 
make 3 Http call. And thus it existing all connectors restarts 3 times.

Is there any way to create all these 3 connectors for same distributed worker 
in one HTTP call?

 *If not can we add this as a improvement?*

Something like
{code:java}
## http://localhost:8083/connectors
[
{
"name": "connector-1",
"config": {
"connector.class": "Connector1Class",
"tasks.max": "36",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-1",
"offset.start": "0"
}
},
{
"name": "connector-2",
"config": {
"connector.class": "Connector2Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-2",
"offset.start": "0"
}
},
{
"name": "connector-3",
"config": {
"connector.class": "Connector3Class",
"tasks.max": "18",
"run.next.start": "5",
"fetchsize": "10",
"topic": "message.connector-3",
"offset.start": "0"
}
}
]
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7075) Allow Topology#addGlobalStore to add a window store

2019-01-17 Thread Tommy Becker (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745164#comment-16745164
 ] 

Tommy Becker commented on KAFKA-7075:
-

[~lmontrieux] I'm curious what your use-case is. After experimenting with 
addGlobalStore(), I'm not sure what value it has. At first glance it seems to 
allow you to plug in a custom processor that allows you to maintain your global 
store however you wish. But the initial population of the store does not go 
through this processor, so if you want to do anything beyond loading the exact 
contents of the topic, you're out of luck. This method actually seems dangerous 
because if your custom processor does not use the keys from the topic verbatim, 
you'll get all manner of weird behavior.

> Allow Topology#addGlobalStore to add a window store
> ---
>
> Key: KAFKA-7075
> URL: https://issues.apache.org/jira/browse/KAFKA-7075
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Nishanth Pradeep
>Priority: Major
>  Labels: newbie
>
> Today although {{Topology#addGlobalStore}} can take any {{StateStore}} types, 
> the internal implementation {{InternalTopologyBuilder#addGlobalStore}} only 
> accepts {{StoreBuilder}}. It means if users pass in a windowed 
> store builder in {{Topology#addGlobalStore}} it will cause a runtime 
> ClassCastException.
> We should fix this issue by relaxing the {{InternalTopologyBuilder}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2019-01-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745176#comment-16745176
 ] 

ASF GitHub Bot commented on KAFKA-5488:
---

inponomarev commented on pull request #6164: KAFKA-5488: A method-chaining way 
to build branches for topology
URL: https://github.com/apache/kafka/pull/6164
 
 
   `KStream.branch` method uses varargs to supply predicates and returns array 
of streams ('[Each stream in the result array corresponds position-wise (index) 
to the predicate in the supplied 
predicates](http://home.apache.org/~ijuma/kafka-0.10.0.1-rc1/javadoc/org/apache/kafka/streams/kstream/KStream.html)').
 
   
   This is poor API design that makes building branches very inconvenient 
because of 'impedance mismatch' between arrays and generics in Java language.
   
* In general, the code  have low cohesion: we need to define predicates in 
one place, and respective stream processors in another place of code. 

* If the number of predicates is predefined, this method forces us to use 
'magic numbers' to extract the right branch from the result (see examples 
[here](https://stackoverflow.com/questions/48950580/kafka-streams-send-on-different-topics-depending-on-streams-data)).

* If we need to build branches dynamically (e. g. one branch per enum 
value) we inevitably have to deal with 'generic arrays' and 'unchecked 
typecasts'.

   The proposed class `KafkaStreamBrancher` introduces new standard way to 
build branches on top of KStream. 
   
   Instead of 
   
   ```java
   KStream source_o365_user_activity = builder.stream("source");
   KStream[] branches = source_o365_user_activity.branch( 
 (key, value) -> value.contains("A"),
 (key, value) -> value.contains("B"),
 (key, value) -> true
);
   
   branches[0].to("A");
   branches[1].to("B");
   branches[2].to("C");
   ```
   
   we can use
   
   ```java
   new KafkaStreamsBrancher()
  .addBranch((key, value) -> value.contains("A"), ks->ks.to("A"))
  .addBranch((key, value) -> value.contains("B"), ks->ks.to("B"))
  //default branch should not necessarily be defined in the end!
  .addDefaultBranch(ks->ks.to("C"))
  .onTopOf(builder.stream("source"))
   ```
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Marcel "childNo͡.de" Trautwein
>Priority: Major
>  Labels: needs-kip
>
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:lang=java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:lang=java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
> 

[jira] [Commented] (KAFKA-7149) Reduce assignment data size to improve kafka streams scalability

2019-01-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16745082#comment-16745082
 ] 

ASF GitHub Bot commented on KAFKA-7149:
---

brary commented on pull request #5663: KAFKA-7149 Reduce assignment data size 
URL: https://github.com/apache/kafka/pull/5663
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce assignment data size to improve kafka streams scalability
> 
>
> Key: KAFKA-7149
> URL: https://issues.apache.org/jira/browse/KAFKA-7149
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Ashish Surana
>Assignee: Navinder Brar
>Priority: Major
> Fix For: 2.2.0
>
>
> We observed that when we have high number of partitions, instances or 
> stream-threads, assignment-data size grows too fast and we start getting 
> below RecordTooLargeException at kafka-broker.
> Workaround of this issue is commented at: 
> https://issues.apache.org/jira/browse/KAFKA-6976
> Still it limits the scalability of kafka streams as moving around 100MBs of 
> assignment data for each rebalancing affects performance & reliability 
> (timeout exceptions starts appearing) as well. Also this limits kafka streams 
> scale even with high max.message.bytes setting as data size increases pretty 
> quickly with number of partitions, instances or stream-threads.
>  
> Solution:
> To address this issue in our cluster, we are sending the compressed 
> assignment-data. We saw assignment-data size reduced by 8X-10X. This improved 
> the kafka streams scalability drastically for us and we could now run it with 
> more than 8,000 partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2019-01-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744874#comment-16744874
 ] 

ASF GitHub Bot commented on KAFKA-7641:
---

stanislavkozlovski commented on pull request #6163: KAFKA-7641: Introduce 
"consumer.group.max.size" config to limit consumer group sizes
URL: https://github.com/apache/kafka/pull/6163
 
 
   This patch introduces a new config - "consumer.group.max.size", which caps 
the maximum size any consumer group can reach. It has a default value of 
Int.MAX_VALUE.
   Once a consumer group is of the maximum size, subsequent JoinGroup requests 
receive a MAX_SIZE_REACHED error.
   
   In the case where the config is changed and a Coordinator broker with the 
new config loads an old group that is over the threshold, members are kicked 
out of the group and a rebalance is forced.
   
   I have added two integration tests for both scenarios - a member joining an 
already-full group and a rolling restart with a new config
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Stanislav Kozlovski
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7520) Adding possibility to configure versions in Mirror Maker ducktape test

2019-01-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744861#comment-16744861
 ] 

ASF GitHub Bot commented on KAFKA-7520:
---

akatona84 commented on pull request #5818: KAFKA-7520: Possibility to configure 
versions in Mirror Maker ducktape test
URL: https://github.com/apache/kafka/pull/5818
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Adding possibility to configure versions in Mirror Maker ducktape test
> --
>
> Key: KAFKA-7520
> URL: https://issues.apache.org/jira/browse/KAFKA-7520
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Andras Katona
>Assignee: Andras Katona
>Priority: Minor
>
> Currently it is testing the current (dev) version only. It would be nice to 
> test mirror maker between different type of brokers for example.
> Test: {{tests/kafkatest/tests/core/mirror_maker_test.py}}
> Test service: {{tests/kafkatest/services/mirror_maker.py}}
> This ticket is for extending MM test service and modify test itself to be 
> able to configure it with other than DEV version, but not changing the test's 
> behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7520) Adding possibility to configure versions in Mirror Maker ducktape test

2019-01-17 Thread Andras Katona (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona resolved KAFKA-7520.
--
Resolution: Won't Fix

> Adding possibility to configure versions in Mirror Maker ducktape test
> --
>
> Key: KAFKA-7520
> URL: https://issues.apache.org/jira/browse/KAFKA-7520
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Andras Katona
>Assignee: Andras Katona
>Priority: Minor
>
> Currently it is testing the current (dev) version only. It would be nice to 
> test mirror maker between different type of brokers for example.
> Test: {{tests/kafkatest/tests/core/mirror_maker_test.py}}
> Test service: {{tests/kafkatest/services/mirror_maker.py}}
> This ticket is for extending MM test service and modify test itself to be 
> able to configure it with other than DEV version, but not changing the test's 
> behaviour.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-4113) Allow KTable bootstrap

2019-01-17 Thread Edmondo Porcu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-4113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16744779#comment-16744779
 ] 

Edmondo Porcu commented on KAFKA-4113:
--

Sure, most precisely, we are running a Ktable-KTable Kstreams app which 
performs a join of two compacted topics: they have the same number of partition 
and the same key, and when the up is "running" and we get new items on the 
topics, everything works fine.

 

Since we are still having our brokers in 0.11, sometimes the app crashes with 
OutOfOrderException and as it restarts, since we have no local storage, it will 
consume all the changelog. When this happens, we see some join failures at 
startup, i.e. data that we know and we checked exist with the correct 
timestamps on both topics which doesn't trigger an output join. 

 

We performed the following checks
 # The data is in the topics at the right time for both left and right side, 
with the right timestamp
 # Missed join can be re-triggered by making either the left side or the right 
side tick again
 # In the end, since one of the two Ktable is a join of kstream-kstream 
separate app consuming from a topic produced by Kafka connect, we end up 
updating the timestamp columns in the database to solve the problem
 # Note that at point 1 we have verified that the data is always available in 
the Ktables, so the join mentioned at the 3 item of this list works correctly 
and is executed in a separate app. The one failing is the ktable-ktable 

 

The impression is that when OutOfOrderException occurs and the app restarts, 
one of the two topics is consumed quicker than the other one (one of the two 
topics is much larger in terms of data size) and therefore the lookup of the 
Inner Join failst. 

> Allow KTable bootstrap
> --
>
> Key: KAFKA-4113
> URL: https://issues.apache.org/jira/browse/KAFKA-4113
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Major
>
> On the mailing list, there are multiple request about the possibility to 
> "fully populate" a KTable before actual stream processing start.
> Even if it is somewhat difficult to define, when the initial populating phase 
> should end, there are multiple possibilities:
> The main idea is, that there is a rarely updated topic that contains the 
> data. Only after this topic got read completely and the KTable is ready, the 
> application should start processing. This would indicate, that on startup, 
> the current partition sizes must be fetched and stored, and after KTable got 
> populated up to those offsets, stream processing can start.
> Other discussed ideas are:
> 1) an initial fixed time period for populating
> (it might be hard for a user to estimate the correct value)
> 2) an "idle" period, ie, if no update to a KTable for a certain time is
> done, we consider it as populated
> 3) a timestamp cut off point, ie, all records with an older timestamp
> belong to the initial populating phase
> The API change is not decided yet, and the API desing is part of this JIRA.
> One suggestion (for option (4)) was:
> {noformat}
> KTable table = builder.table("topic", 1000); // populate the table without 
> reading any other topics until see one record with timestamp 1000.
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)