[jira] [Comment Edited] (KAFKA-5846) Use singleton NoOpConsumerRebalanceListener in subscribe() call where listener is not specified

2018-03-09 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-5846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191405#comment-16191405
 ] 

Ted Yu edited comment on KAFKA-5846 at 3/10/18 12:42 AM:
-

Patch looks good to me.


was (Author: yuzhih...@gmail.com):
Patch looks good to me .

> Use singleton NoOpConsumerRebalanceListener in subscribe() call where 
> listener is not specified
> ---
>
> Key: KAFKA-5846
> URL: https://issues.apache.org/jira/browse/KAFKA-5846
> Project: Kafka
>  Issue Type: Task
>  Components: clients
>Reporter: Ted Yu
>Assignee: Kamal Chandraprakash
>Priority: Minor
>
> Currently KafkaConsumer creates instance of NoOpConsumerRebalanceListener for 
> each subscribe() call where ConsumerRebalanceListener is not specified:
> {code}
> public void subscribe(Pattern pattern) {
> subscribe(pattern, new NoOpConsumerRebalanceListener());
> {code}
> We can create a singleton NoOpConsumerRebalanceListener to be used in such 
> scenarios.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6634) Delay initiating the txn on producers until initializeTopology with EOS turned on

2018-03-09 Thread Guozhang Wang (JIRA)
Guozhang Wang created KAFKA-6634:


 Summary: Delay initiating the txn on producers until 
initializeTopology with EOS turned on
 Key: KAFKA-6634
 URL: https://issues.apache.org/jira/browse/KAFKA-6634
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang
Assignee: Guozhang Wang


In Streams EOS implementation, the created producers for tasks will initiate a 
txn immediately after being created in the constructor of `StreamTask`. 
However, the task may not process any data and hence producer may not send any 
records for that started txn for a long time because of the restoration 
process. And with default txn.session.timeout valued at 60 seconds, it means 
that if the restoration takes more than that amount of time, upon starting the 
producer will immediately get the error that its producer epoch is already old.

To fix this, we should consider instantiating the txn only after the 
restoration phase is done. Although this may have a caveat that if the producer 
is already fenced, it will not be notified until then, in initializeTopology. 
But I think this should not be a correctness issue since during the restoration 
process we do not make any changes to the processing state.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6264) Log cleaner thread may die on legacy segment containing messages whose offsets are too large

2018-03-09 Thread Dhruvil Shah (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dhruvil Shah reassigned KAFKA-6264:
---

Assignee: Dhruvil Shah  (was: Jiangjie Qin)

> Log cleaner thread may die on legacy segment containing messages whose 
> offsets are too large
> 
>
> Key: KAFKA-6264
> URL: https://issues.apache.org/jira/browse/KAFKA-6264
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.10.2.1, 1.0.0, 0.11.0.2
>Reporter: Jiangjie Qin
>Assignee: Dhruvil Shah
>Priority: Critical
> Fix For: 1.2.0
>
>
> We encountered a problem that some of the legacy log segments contains 
> messages whose offsets are larger than {{SegmentBaseOffset + Int.MaxValue}}.
> Prior to 0.10.2.0, we do not assert the offset of the messages when appending 
> them to the log segments. Due to KAFKA-5413, the log cleaner may append 
> messages whose offset is greater than {{base_offset + Int.MaxValue}} into the 
> segment during the log compaction.
> After the brokers are upgraded, those log segments cannot be compacted 
> anymore because the compaction will fail immediately due to the offset range 
> assertion we added to the LogSegment.
> We have seen this issue in the __consumer_offsets topic so it could be a 
> general problem. There is no easy solution for the users to recover from this 
> case. 
> One solution is to split such log segments in the log cleaner once it sees a 
> message with problematic offset and append those messages to a separate log 
> segment with a larger base_offset.
> Due to the impact of the issue. We may want to consider backporting the fix 
> to previous affected versions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6622) GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly

2018-03-09 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6622.

   Resolution: Fixed
Fix Version/s: 1.1.0

> GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly
> --
>
> Key: KAFKA-6622
> URL: https://issues.apache.org/jira/browse/KAFKA-6622
> Project: Kafka
>  Issue Type: Bug
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Fix For: 1.1.0
>
> Attachments: kafka batch iteration funtime.png
>
>
> when reading records from a consumer offsets batch, the entire batch is 
> decompressed multiple times (per record) as part of calling 
> `batch.baseOffset`. this is a very expensive operation being called in a loop 
> for no reason:
> !kafka batch iteration funtime.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6622) GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly

2018-03-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6622?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393695#comment-16393695
 ] 

ASF GitHub Bot commented on KAFKA-6622:
---

hachikuji closed pull request #4661: KAFKA-6622 - fix performance issue in 
parsing consumer offsets
URL: https://github.com/apache/kafka/pull/4661
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 3b79544a502..63af1cb0ce9 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -555,8 +555,11 @@ class GroupMetadataManager(brokerId: Int,
   }
   pendingOffsets.remove(batch.producerId)
 } else {
+  var batchBaseOffset: Option[Long] = None
   for (record <- batch.asScala) {
 require(record.hasKey, "Group metadata/offset entry key should 
not be null")
+if (batchBaseOffset.isEmpty)
+  batchBaseOffset = Some(record.offset)
 GroupMetadataManager.readMessageKey(record.key) match {
 
   case offsetKey: OffsetKey =>
@@ -573,9 +576,9 @@ class GroupMetadataManager(brokerId: Int,
 } else {
   val offsetAndMetadata = 
GroupMetadataManager.readOffsetMessageValue(record.value)
   if (isTxnOffsetCommit)
-
pendingOffsets(batch.producerId).put(groupTopicPartition, 
CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
+
pendingOffsets(batch.producerId).put(groupTopicPartition, 
CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
   else
-loadedOffsets.put(groupTopicPartition, 
CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
+loadedOffsets.put(groupTopicPartition, 
CommitRecordMetadataAndOffset(batchBaseOffset, offsetAndMetadata))
 }
 
   case groupMetadataKey: GroupMetadataKey =>


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> GroupMetadataManager.loadGroupsAndOffsets decompresses record batch needlessly
> --
>
> Key: KAFKA-6622
> URL: https://issues.apache.org/jira/browse/KAFKA-6622
> Project: Kafka
>  Issue Type: Bug
>Reporter: radai rosenblatt
>Assignee: radai rosenblatt
>Priority: Major
> Attachments: kafka batch iteration funtime.png
>
>
> when reading records from a consumer offsets batch, the entire batch is 
> decompressed multiple times (per record) as part of calling 
> `batch.baseOffset`. this is a very expensive operation being called in a loop 
> for no reason:
> !kafka batch iteration funtime.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6631) Kafka Streams - Rebalancing exception in Kafka 1.0.0

2018-03-09 Thread Guozhang Wang (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6631?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393680#comment-16393680
 ] 

Guozhang Wang commented on KAFKA-6631:
--

Hello Alexander,

Thanks for reporting this issue; looking through the logs I think the root 
cause is that the assignment record that is trying to be appended to the offset 
topic is larger than the max.record.size, by default it's 1MB.

The reason you would not see it with one client is that with more clients, more 
bytes would be needed to encode the assignment metadata in the SyncGroup 
request, and hence eventually it will exceed the limit.

Normally 1MB should be an appropriate limit, and since you mention seeing this 
issue only with three clients, I'm wondering if you have many partitions for 
your input topics? Note that the encoding bytes are linear with the total 
number of partitions as well.

To walk around this issue, try to set a larger value for the 
"__consumer_offset" topic's "max.message.bytes" config (note this is a 
per-topic config).


> Kafka Streams - Rebalancing exception in Kafka 1.0.0
> 
>
> Key: KAFKA-6631
> URL: https://issues.apache.org/jira/browse/KAFKA-6631
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: Container Linux by CoreOS 1576.5.0
>Reporter: Alexander Ivanichev
>Priority: Critical
>
>  
> In Kafka Streams 1.0.0, we saw a strange rebalance error, our stream app 
> performs window based aggregations, sometimes on start when all stream 
> workers  join the app just crash, however if we enable only one worker than 
> it works fine, sometime 2 workers work just fine, but when third join the app 
> crashes again, some critical issue with rebalance.
> {code:java}
> 018-03-08T18:51:01.226243000Z org.apache.kafka.common.KafkaException: 
> Unexpected error from SyncGroup: The server experienced an unexpected error 
> when processing the request
> 2018-03-08T18:51:01.226557000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:566)
> 2018-03-08T18:51:01.22686Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:539)
> 2018-03-08T18:51:01.227328000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:808)
> 2018-03-08T18:51:01.22763Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:788)
> 2018-03-08T18:51:01.228152000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
> 2018-03-08T18:51:01.228449000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
> 2018-03-08T18:51:01.228897000Z at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
> 2018-03-08T18:51:01.229196000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:506)
> 2018-03-08T18:51:01.229673000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:353)
> 2018-03-08T18:51:01.229971000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:268)
> 2018-03-08T18:51:01.230436000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:214)
> 2018-03-08T18:51:01.230749000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:174)
> 2018-03-08T18:51:01.231065000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:364)
> 2018-03-08T18:51:01.231584000Z at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316)
> 2018-03-08T18:51:01.231911000Z at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295)
> 2018-03-08T18:51:01.23219Z at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1138)
> 2018-03-08T18:51:01.232643000Z at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
> 2018-03-08T18:51:01.233121000Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851)
> 2018-03-08T18:51:01.233409000Z at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808)
> 2018-03-08T18:51:01.23372Z at 
> org.apache.kafka.streams

[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity

2018-03-09 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393587#comment-16393587
 ] 

John Roesler commented on KAFKA-6535:
-

I don't think it's that important in this context, but it's worth generally 
bearing in mind that a MAX_INT number of milliseconds is about 25 days.

So this proposal is more like "set default retention to max allowed value"

> Set default retention ms for Streams repartition topics to infinity
> ---
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6627) Console producer default config values override explicitly provided properties

2018-03-09 Thread Sandor Murakozi (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6627?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sandor Murakozi reassigned KAFKA-6627:
--

Assignee: Sandor Murakozi

> Console producer default config values override explicitly provided properties
> --
>
> Key: KAFKA-6627
> URL: https://issues.apache.org/jira/browse/KAFKA-6627
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: Jason Gustafson
>Assignee: Sandor Murakozi
>Priority: Minor
>  Labels: newbie
>
> Some producer properties can be provided through custom parameters (e.g. 
> {{\-\-request-required-acks}}) and explicitly through 
> {{\-\-producer-property}}. At the moment, some of the custom parameters have 
> default values which actually override explicitly provided properties. For 
> example, if you set {{\-\-producer-property acks=all}} when starting the 
> console producer, the argument will be ignored since 
> {{\-\-request-required-acks}} has a default value. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams

2018-03-09 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393486#comment-16393486
 ] 

Matthias J. Sax commented on KAFKA-6399:


Thanks for the feedback. It's a tricky question and I am personally not sure 
what I prefer. My thinking is as follows: initially, we used 30 seconds what 
was too short because of store restore time. Since we set it to MAX_VALUE, I 
cannot remember any use issues related to the config. Thus, it might even be ok 
to keep the default at MAX_VALUE. If we still need MAX_VALUEis questionable 
though as we moved the restore code into the mail loop and got rid of the root 
cause that forces us to set it to MAX_VALUE. However, because I can't remember 
any issues with MAX_VALUE, even if we don't need this high value, it seems to 
work in practice. We know from some user reports, that processing time can vary 
largely, thus, even is we set it to 5 Minutes, it would bit some users if they 
don't increase the setting. Keeping MAX_VALUE would be a safe bet for this 
case. However, I am a little concerned about a bad behaving app that never 
times out if the default is MAX_VALUE: users code could loop infinitely for 
example.

Long story short: I think it boils down to the question if we either want to 
make sure the default settings are robust with regard to "make progress" or if 
the default setting should be more "error sensitive". I guess, for most cases, 
uses want/should to adjust this value anyway independently what default we 
choose (either some user need to increase or other users should decrease to 
enable error detection in the first place).

> Consider reducing "max.poll.interval.ms" default for Kafka Streams
> --
>
> Key: KAFKA-6399
> URL: https://issues.apache.org/jira/browse/KAFKA-6399
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Minor
>
> In Kafka {{0.10.2.1}} we change the default value of 
> {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The 
> reason was that long state restore phases during rebalance could yield 
> "rebalance storms" as consumers drop out of a consumer group even if they are 
> healthy as they didn't call {{poll()}} during state restore phase.
> In version {{0.11}} and {{1.0}} the state restore logic was improved a lot 
> and thus, now Kafka Streams does call {{poll()}} even during restore phase. 
> Therefore, we might consider setting a smaller timeout for 
> {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications 
> (ie, targeting user code) that don't make progress any more during regular 
> operations.
> The open question would be, what a good default might be. Maybe the actual 
> consumer default of 30 seconds might be sufficient. During one {{poll()}} 
> roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a 
> single batch of records. This should take way less time than 30 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity

2018-03-09 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393457#comment-16393457
 ] 

Matthias J. Sax commented on KAFKA-6535:


Yes. It's a long though (not and integer). We should pass this config when 
creating repartitions topics (cf. {{InternalTopicManager}}).

> Set default retention ms for Streams repartition topics to infinity
> ---
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-03-09 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393444#comment-16393444
 ] 

John Roesler commented on KAFKA-6474:
-

Also worth mentioning is [~guozhang]'s reply:
{quote}I think I agree with your proposed changes, in fact in order to not 
scatter
the test classes in two places maybe it's better to move all of them to the
new module. One caveat is that it will make streams' project hierarchy
inconsistent with other projects where the unit test classes are maintained
inside the main artifact package, but I think it is a good cost to pay,
plus once we start publishing test-util artifacts for other projects like
client and connect, we may face the same issue and need to do this
refactoring as well.
{quote}

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
>  Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6474) Rewrite test to use new public TopologyTestDriver

2018-03-09 Thread John Roesler (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393415#comment-16393415
 ] 

John Roesler commented on KAFKA-6474:
-

Hi Filipe,

I have been working on a similar task (KAFKA-6473) and discovered something 
that will probably be a problem for you.

This task requires the streams project to have a test dependency on the 
stream:test-utils project, but streams:test-utils already has a compile 
dependency on the streams project. I'm no Gradle expert, but as far as I can 
tell, there's no way to break this circular dependency, at least without doing 
something exotic in the gradle config.

We had a discussion in this mailing list thread: [[DISCUSS] 
KIP-267|[http://mail-archives.apache.org/mod_mbox/kafka-dev/201803.mbox/%3CCAAyirGsovAzRMLa91nd6rzceQgEcNcLMt7ZrXVN7M1Psj4jCmQ%40mail.gmail.com%3E]|http://mail-archives.apache.org/mod_mbox/kafka-dev/201803.mbox/%3CCAAyirGsovAzRMLa91nd6rzceQgEcNcLMt7ZrXVN7M1Psj4jCmQ%40mail.gmail.com%3E].]
 . Here's what we settled on:

 
{quote}I would propose we restructure the streams directory thusly:
streams/ (artifact name := "streams", the actual streams code lives here)
- test-utils/ (this is the current test-utils artifact, depends on "streams")
- tests/ (new module, depends on "streams" and "test-utils", *NO published 
artifact*)
This gets us out of the circular dependency without having to engage in any 
Gradle shenanigans while preserving "test-utils" as a separate artifact. This 
is good because: 1) the test-utils don't need to be in production code, so it's 
nice to have a separate artifact, 2) test-utils is already public in 1.1, and 
it's a bummer to introduce users' code when we can so easily avoid it.{quote}
 
Another result of the discussion is that I'm actually going to side-step this 
issue for KAFKA-6473, so I won't be doing any restructuring in the course of my 
work. I'm just sharing these ideas with you for your context.
 
Hope you're well!
-John

> Rewrite test to use new public TopologyTestDriver
> -
>
> Key: KAFKA-6474
> URL: https://issues.apache.org/jira/browse/KAFKA-6474
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Affects Versions: 1.1.0
>Reporter: Matthias J. Sax
>Assignee: Filipe Agapito
>Priority: Major
>  Labels: beginner, newbie
>
> With KIP-247 we added public TopologyTestDriver. We should rewrite out own 
> test to use this new test driver and remove the two classes 
> ProcessorTopoogyTestDriver and KStreamTestDriver.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6626) Performance bottleneck in Kafka Connect sendRecords

2018-03-09 Thread Ewen Cheslack-Postava (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6626?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393217#comment-16393217
 ] 

Ewen Cheslack-Postava commented on KAFKA-6626:
--

It cannot be a regular map, see the comment right above that field: 
https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L79-L80

> Performance bottleneck in Kafka Connect sendRecords
> ---
>
> Key: KAFKA-6626
> URL: https://issues.apache.org/jira/browse/KAFKA-6626
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Maciej Bryński
>Priority: Major
> Attachments: MapPerf.java, image-2018-03-08-08-35-19-247.png
>
>
> Kafka Connect is using IdentityHashMap for storing records.
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L239]
> Unfortunately this solution is very slow (2-4 times slower than normal 
> HashMap / HashSet).
> Benchmark result (code in attachment). 
> {code:java}
> Identity 4220
> Set 2115
> Map 1941
> Fast Set 2121
> {code}
> Things are even worse when using default GC configuration 
>  (-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 
> -XX:InitiatingHeapOccupancyPercent=35  -Djava.awt.headless=true)
> {code:java}
> Identity 7885
> Set 2364
> Map 1548
> Fast Set 1520
> {code}
> Java version
> {code:java}
> java version "1.8.0_152"
> Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
> Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)
> {code}
> This problem is greatly slowing Kafka Connect.
> !image-2018-03-08-08-35-19-247.png!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6399) Consider reducing "max.poll.interval.ms" default for Kafka Streams

2018-03-09 Thread Bill Bejeck (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6399?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16393169#comment-16393169
 ] 

Bill Bejeck commented on KAFKA-6399:


I'm +1 as well for reducing the {{max.poll.intervall.ms config }}from{{ 
}}{{Integer.MAX_VALUE}},  I'm not sure about 30 seconds.

My reasoning is we should still set the value to something more conservative (5 
minutes is okay, I don't have another suggestion), as a hedge because we can't 
accurately predict user patterns concerning processing data. 

But we can add to the documentation the reason why we had it set to 
{{Integer.MAX_VALUE}} in the first place and what we've done to improve the 
rebalance process, and suggest that users reduce it further if need be.

> Consider reducing "max.poll.interval.ms" default for Kafka Streams
> --
>
> Key: KAFKA-6399
> URL: https://issues.apache.org/jira/browse/KAFKA-6399
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.0.0
>Reporter: Matthias J. Sax
>Priority: Minor
>
> In Kafka {{0.10.2.1}} we change the default value of 
> {{max.poll.intervall.ms}} for Kafka Streams to {{Integer.MAX_VALUE}}. The 
> reason was that long state restore phases during rebalance could yield 
> "rebalance storms" as consumers drop out of a consumer group even if they are 
> healthy as they didn't call {{poll()}} during state restore phase.
> In version {{0.11}} and {{1.0}} the state restore logic was improved a lot 
> and thus, now Kafka Streams does call {{poll()}} even during restore phase. 
> Therefore, we might consider setting a smaller timeout for 
> {{max.poll.intervall.ms}} to detect bad behaving Kafka Streams applications 
> (ie, targeting user code) that don't make progress any more during regular 
> operations.
> The open question would be, what a good default might be. Maybe the actual 
> consumer default of 30 seconds might be sufficient. During one {{poll()}} 
> roundtrip, we would only call {{restoreConsumer.poll()}} once and restore a 
> single batch of records. This should take way less time than 30 seconds.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5863) Potential null dereference in DistributedHerder#reconfigureConnector()

2018-03-09 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-5863?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-5863:
--
Description: 
Here is the call chain:

{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}

  was:
Here is the call chain:
{code}
RestServer.httpRequest(reconfigUrl, "POST", 
taskProps, null);
{code}
In httpRequest():
{code}
} else if (responseCode >= 200 && responseCode < 300) {
InputStream is = connection.getInputStream();
T result = JSON_SERDE.readValue(is, responseFormat);
{code}
For readValue():
{code}
public  T readValue(InputStream src, TypeReference valueTypeRef)
throws IOException, JsonParseException, JsonMappingException
{
return (T) _readMapAndClose(_jsonFactory.createParser(src), 
_typeFactory.constructType(valueTypeRef));
{code}
Then there would be NPE in constructType():
{code}
public JavaType constructType(TypeReference typeRef)
{
// 19-Oct-2015, tatu: Simpler variant like so should work
return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
{code}


> Potential null dereference in DistributedHerder#reconfigureConnector()
> --
>
> Key: KAFKA-5863
> URL: https://issues.apache.org/jira/browse/KAFKA-5863
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Ted Yu
>Priority: Minor
>
> Here is the call chain:
> {code}
> RestServer.httpRequest(reconfigUrl, "POST", 
> taskProps, null);
> {code}
> In httpRequest():
> {code}
> } else if (responseCode >= 200 && responseCode < 300) {
> InputStream is = connection.getInputStream();
> T result = JSON_SERDE.readValue(is, responseFormat);
> {code}
> For readValue():
> {code}
> public  T readValue(InputStream src, TypeReference valueTypeRef)
> throws IOException, JsonParseException, JsonMappingException
> {
> return (T) _readMapAndClose(_jsonFactory.createParser(src), 
> _typeFactory.constructType(valueTypeRef));
> {code}
> Then there would be NPE in constructType():
> {code}
> public JavaType constructType(TypeReference typeRef)
> {
> // 19-Oct-2015, tatu: Simpler variant like so should work
> return _fromAny(null, typeRef.getType(), EMPTY_BINDINGS);
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (KAFKA-6633) Is KafkaProducer still thread safe in version 1.0.1

2018-03-09 Thread Herbert Koelman (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Herbert Koelman closed KAFKA-6633.
--

> Is KafkaProducer still thread safe in version 1.0.1
> ---
>
> Key: KAFKA-6633
> URL: https://issues.apache.org/jira/browse/KAFKA-6633
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
>Reporter: Herbert Koelman
>Priority: Minor
>
> The javadoc of version 
> [0.8|https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]
>  states that producers are thread safe:
> {quote}{{The producer is _thread safe_ and should generally be shared among 
> all threads for best performance.}}
> {quote}
> Is it still the case in version 1.0.1 ? I failed to find this information in 
> the javadoc of version 1.0.1.
> Can I share one producer with many threads ?
> (I posted this question as a bug, because I didn't know where else I could 
> post questions. Sorry)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6633) Is KafkaProducer still thread safe in version 1.0.1

2018-03-09 Thread Herbert Koelman (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392845#comment-16392845
 ] 

Herbert Koelman commented on KAFKA-6633:


Thanks for this quick answear.

> Is KafkaProducer still thread safe in version 1.0.1
> ---
>
> Key: KAFKA-6633
> URL: https://issues.apache.org/jira/browse/KAFKA-6633
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
>Reporter: Herbert Koelman
>Priority: Minor
>
> The javadoc of version 
> [0.8|https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]
>  states that producers are thread safe:
> {quote}{{The producer is _thread safe_ and should generally be shared among 
> all threads for best performance.}}
> {quote}
> Is it still the case in version 1.0.1 ? I failed to find this information in 
> the javadoc of version 1.0.1.
> Can I share one producer with many threads ?
> (I posted this question as a bug, because I didn't know where else I could 
> post questions. Sorry)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6632) Very slow hashCode methods in Kafka Connect types

2018-03-09 Thread JIRA

[ 
https://issues.apache.org/jira/browse/KAFKA-6632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392790#comment-16392790
 ] 

Maciej Bryński commented on KAFKA-6632:
---

I checked also Struct Schemas.

My implementation: 6026
Connect: 18880

> Very slow hashCode methods in Kafka Connect types
> -
>
> Key: KAFKA-6632
> URL: https://issues.apache.org/jira/browse/KAFKA-6632
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Maciej Bryński
>Priority: Major
>
> hashCode method of ConnectSchema (and Field) is used a lot in SMT and 
> fromConnect.
> Example:
> [https://github.com/apache/kafka/blob/e5d6c9a79a4ca9b82502b8e7f503d86ddaddb7fb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L164]
> Unfortunately it's using Objects.hash which is very slow.
> I rewrite this to own implementation and gain 6x speedup.
> Microbencharks gives:
>  * Original ConnectSchema hashCode: 2995ms
>  * My implementation: 517ms
> (1 iterations of calculating: hashCode for on new 
> ConnectSchema(Schema.Type.STRING))
> {code:java}
> @Override
> public int hashCode() {
> int result = 5;
> result = 31 * result + type.hashCode();
> result = 31 * result + (optional ? 1 : 0);
> result = 31 * result + (defaultValue == null ? 0 : 
> defaultValue.hashCode());
> if (fields != null) {
> for (Field f : fields) {
> result = 31 * result + f.hashCode();
> }
> }
> result = 31 * result + (keySchema == null ? 0 : keySchema.hashCode());
> result = 31 * result + (valueSchema == null ? 0 : valueSchema.hashCode());
> result = 31 * result + (name == null ? 0 : name.hashCode());
> result = 31 * result + (version == null ? 0 : version);
> result = 31 * result + (doc == null ? 0 : doc.hashCode());
> if (parameters != null) {
> for (Map.Entry e : parameters.entrySet()) {
> result = 31 * result + e.getKey().hashCode() + 
> e.getValue().hashCode();
> }
> }
> return result;
> }{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6632) Very slow hashCode methods in Kafka Connect types

2018-03-09 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/KAFKA-6632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maciej Bryński updated KAFKA-6632:
--
Description: 
hashCode method of ConnectSchema (and Field) is used a lot in SMT and 
fromConnect.

Example:

[https://github.com/apache/kafka/blob/e5d6c9a79a4ca9b82502b8e7f503d86ddaddb7fb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L164]

Unfortunately it's using Objects.hash which is very slow.

I rewrite this to own implementation and gain 6x speedup.

Microbencharks gives:
 * Original ConnectSchema hashCode: 2995ms
 * My implementation: 517ms

(1 iterations of calculating: hashCode for on new 
ConnectSchema(Schema.Type.STRING))
{code:java}
@Override
public int hashCode() {
int result = 5;
result = 31 * result + type.hashCode();
result = 31 * result + (optional ? 1 : 0);
result = 31 * result + (defaultValue == null ? 0 : defaultValue.hashCode());
if (fields != null) {
for (Field f : fields) {
result = 31 * result + f.hashCode();
}
}
result = 31 * result + (keySchema == null ? 0 : keySchema.hashCode());
result = 31 * result + (valueSchema == null ? 0 : valueSchema.hashCode());
result = 31 * result + (name == null ? 0 : name.hashCode());
result = 31 * result + (version == null ? 0 : version);
result = 31 * result + (doc == null ? 0 : doc.hashCode());
if (parameters != null) {
for (Map.Entry e : parameters.entrySet()) {
result = 31 * result + e.getKey().hashCode() + 
e.getValue().hashCode();
}
}
return result;
}{code}

  was:
hashCode method of ConnectSchema (and Field) is used a lot in SMT.

Example:

[https://github.com/apache/kafka/blob/e5d6c9a79a4ca9b82502b8e7f503d86ddaddb7fb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L164]

Unfortunately it's using Objects.hash which is very slow.

I rewrite this to own implementation and gain 6x speedup.

Microbencharks gives:
 * Original ConnectSchema hashCode: 2995ms
 * My implementation: 517ms

(1 iterations of calculating: hashCode for on new 
ConnectSchema(Schema.Type.STRING))
{code:java}
@Override
public int hashCode() {
int result = 5;
result = 31 * result + type.hashCode();
result = 31 * result + (optional ? 1 : 0);
result = 31 * result + (defaultValue == null ? 0 : defaultValue.hashCode());
if (fields != null) {
for (Field f : fields) {
result = 31 * result + f.hashCode();
}
}
result = 31 * result + (keySchema == null ? 0 : keySchema.hashCode());
result = 31 * result + (valueSchema == null ? 0 : valueSchema.hashCode());
result = 31 * result + (name == null ? 0 : name.hashCode());
result = 31 * result + (version == null ? 0 : version);
result = 31 * result + (doc == null ? 0 : doc.hashCode());
if (parameters != null) {
for (String s : parameters.keySet()) {
result = 31 * result + s.hashCode() + parameters.get(s).hashCode();
}
}
return result;
}{code}


> Very slow hashCode methods in Kafka Connect types
> -
>
> Key: KAFKA-6632
> URL: https://issues.apache.org/jira/browse/KAFKA-6632
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Maciej Bryński
>Priority: Major
>
> hashCode method of ConnectSchema (and Field) is used a lot in SMT and 
> fromConnect.
> Example:
> [https://github.com/apache/kafka/blob/e5d6c9a79a4ca9b82502b8e7f503d86ddaddb7fb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L164]
> Unfortunately it's using Objects.hash which is very slow.
> I rewrite this to own implementation and gain 6x speedup.
> Microbencharks gives:
>  * Original ConnectSchema hashCode: 2995ms
>  * My implementation: 517ms
> (1 iterations of calculating: hashCode for on new 
> ConnectSchema(Schema.Type.STRING))
> {code:java}
> @Override
> public int hashCode() {
> int result = 5;
> result = 31 * result + type.hashCode();
> result = 31 * result + (optional ? 1 : 0);
> result = 31 * result + (defaultValue == null ? 0 : 
> defaultValue.hashCode());
> if (fields != null) {
> for (Field f : fields) {
> result = 31 * result + f.hashCode();
> }
> }
> result = 31 * result + (keySchema == null ? 0 : keySchema.hashCode());
> result = 31 * result + (valueSchema == null ? 0 : valueSchema.hashCode());
> result = 31 * result + (name == null ? 0 : name.hashCode());
> result = 31 * result + (version == null ? 0 : version);
> result = 31 * result + (doc == null ? 0 : doc.hashCode());
> if (parameters != null) {
> for (Map.Entry e : parameters.entrySet()) {
> result = 31 * result + e.getKe

[jira] [Resolved] (KAFKA-6633) Is KafkaProducer still thread safe in version 1.0.1

2018-03-09 Thread Mickael Maison (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-6633.
---
Resolution: Not A Problem

> Is KafkaProducer still thread safe in version 1.0.1
> ---
>
> Key: KAFKA-6633
> URL: https://issues.apache.org/jira/browse/KAFKA-6633
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
>Reporter: Herbert Koelman
>Priority: Minor
>
> The javadoc of version 
> [0.8|https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]
>  states that producers are thread safe:
> {quote}{{The producer is _thread safe_ and should generally be shared among 
> all threads for best performance.}}
> {quote}
> Is it still the case in version 1.0.1 ? I failed to find this information in 
> the javadoc of version 1.0.1.
> Can I share one producer with many threads ?
> (I posted this question as a bug, because I didn't know where else I could 
> post questions. Sorry)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6633) Is KafkaProducer still thread safe in version 1.0.1

2018-03-09 Thread Mickael Maison (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6633?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392763#comment-16392763
 ] 

Mickael Maison commented on KAFKA-6633:
---

Yes it is still the case. And it's also still in the javadoc !

See 
[http://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html]

or 
[https://github.com/apache/kafka/blob/1.0.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L85-L86|https://github.com/apache/kafka/blob/1.0.1/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L85-L86]

It's right at the top:
{code:java}
The producer is thread safe and sharing a single producer instance across 
threads will generally be faster than having multiple instances. {code}

> Is KafkaProducer still thread safe in version 1.0.1
> ---
>
> Key: KAFKA-6633
> URL: https://issues.apache.org/jira/browse/KAFKA-6633
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 1.0.1
>Reporter: Herbert Koelman
>Priority: Minor
>
> The javadoc of version 
> [0.8|https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]
>  states that producers are thread safe:
> {quote}{{The producer is _thread safe_ and should generally be shared among 
> all threads for best performance.}}
> {quote}
> Is it still the case in version 1.0.1 ? I failed to find this information in 
> the javadoc of version 1.0.1.
> Can I share one producer with many threads ?
> (I posted this question as a bug, because I didn't know where else I could 
> post questions. Sorry)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6633) Is KafkaProducer still thread safe in version 1.0.1

2018-03-09 Thread Herbert Koelman (JIRA)
Herbert Koelman created KAFKA-6633:
--

 Summary: Is KafkaProducer still thread safe in version 1.0.1
 Key: KAFKA-6633
 URL: https://issues.apache.org/jira/browse/KAFKA-6633
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 1.0.1
Reporter: Herbert Koelman


The javadoc of version 
[0.8|https://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html]
 states that producers are thread safe:
{quote}{{The producer is _thread safe_ and should generally be shared among all 
threads for best performance.}}
{quote}

Is it still the case in version 1.0.1 ? I failed to find this information in 
the javadoc of version 1.0.1.

Can I share one producer with many threads ?

(I posted this question as a bug, because I didn't know where else I could post 
questions. Sorry)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6535) Set default retention ms for Streams repartition topics to infinity

2018-03-09 Thread Khaireddine Rezgui (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16392692#comment-16392692
 ] 

Khaireddine Rezgui commented on KAFKA-6535:
---

Infinity == Integer.MAX_VALUE ?

[~mjsax]

> Set default retention ms for Streams repartition topics to infinity
> ---
>
> Key: KAFKA-6535
> URL: https://issues.apache.org/jira/browse/KAFKA-6535
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> After KIP-220 / KIP-204, repartition topics in Streams are transient, so it 
> is better to set its default retention to infinity to allow any records be 
> pushed to it with old timestamps (think: bootstrapping, re-processing) and 
> just rely on the purging API to keeping its storage small.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6632) Very slow hashCode methods in Kafka Connect types

2018-03-09 Thread JIRA
Maciej Bryński created KAFKA-6632:
-

 Summary: Very slow hashCode methods in Kafka Connect types
 Key: KAFKA-6632
 URL: https://issues.apache.org/jira/browse/KAFKA-6632
 Project: Kafka
  Issue Type: Bug
Affects Versions: 1.0.0
Reporter: Maciej Bryński


hashCode method of ConnectSchema (and Field) is used a lot in SMT.

Example:

[https://github.com/apache/kafka/blob/e5d6c9a79a4ca9b82502b8e7f503d86ddaddb7fb/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L164]

Unfortunately it's using Objects.hash which is very slow.

I rewrite this to own implementation and gain 6x speedup.

Microbencharks gives:
 * Original ConnectSchema hashCode: 2995ms
 * My implementation: 517ms

(1 iterations of calculating: hashCode for on new 
ConnectSchema(Schema.Type.STRING))
{code:java}
@Override
public int hashCode() {
int result = 5;
result = 31 * result + type.hashCode();
result = 31 * result + (optional ? 1 : 0);
result = 31 * result + (defaultValue == null ? 0 : defaultValue.hashCode());
if (fields != null) {
for (Field f : fields) {
result = 31 * result + f.hashCode();
}
}
result = 31 * result + (keySchema == null ? 0 : keySchema.hashCode());
result = 31 * result + (valueSchema == null ? 0 : valueSchema.hashCode());
result = 31 * result + (name == null ? 0 : name.hashCode());
result = 31 * result + (version == null ? 0 : version);
result = 31 * result + (doc == null ? 0 : doc.hashCode());
if (parameters != null) {
for (String s : parameters.keySet()) {
result = 31 * result + s.hashCode() + parameters.get(s).hashCode();
}
}
return result;
}{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)