[jira] [Created] (KAFKA-7555) If the disk is full, Kafka will suddenly fail.

2018-10-25 Thread hajin kim (JIRA)
hajin kim created KAFKA-7555:


 Summary: If the disk is full, Kafka will suddenly fail. 
 Key: KAFKA-7555
 URL: https://issues.apache.org/jira/browse/KAFKA-7555
 Project: Kafka
  Issue Type: Bug
Reporter: hajin kim


If any of the log repositories are full, Kafka will turn off and will not 
restart. After that, I can restart by manually removing the saved logs, but 
this seems obviously a Kafka problem. Is there a way to prevent or avoid this 
problem?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-658) Implement "Exact Mirroring" functionality in mirror maker

2018-10-25 Thread Ryanne Dolan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664592#comment-16664592
 ] 

Ryanne Dolan commented on KAFKA-658:


[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] 
(under discussion) aims to support this.

> Implement "Exact Mirroring" functionality in mirror maker
> -
>
> Key: KAFKA-658
> URL: https://issues.apache.org/jira/browse/KAFKA-658
> Project: Kafka
>  Issue Type: New Feature
>  Components: mirrormaker
>Reporter: Jay Kreps
>Priority: Major
>  Labels: project
>
> There are two ways to implement "mirroring" (i.e. replicating a topic from 
> one cluster to another):
> 1. Do a simple read from the source and write to the destination with no 
> attempt to maintain the same partitioning or offsets in the destination 
> cluster. In this case the destination cluster may have a different number of 
> partitions, and you can even read from many clusters to create a merged 
> cluster. This flexibility is nice. The downside is that since the 
> partitioning and offsets are not the same a consumer of the source cluster 
> has no equivalent position in the destination cluster. This is the style of 
> mirroring we have implemented in the mirror-maker tool and use for datacenter 
> replication today.
> 2. The second style of replication only would allow creating an exact replica 
> of a source cluster (i.e. all partitions and offsets exactly the same). The 
> nice thing about this is that the offsets and partitions would match exactly. 
> The downside is that it is not possible to merge multiple source clusters 
> this way or have different partitioning. We do not currently support this in 
> mirror maker.
> It would be nice to implement the second style as an option in mirror maker 
> as having an exact replica would be a nice option to have in the case where 
> you are replicating a single cluster only.
> There are some nuances: In order to maintain the exact offsets it is 
> important to guarantee that the producer never resends a message or loses a 
> message. As a result it would be important to have only a single producer for 
> each destination partition, and check the last produced message on startup 
> (using the getOffsets api) so that in the case of a hard crash messages that 
> are re-consumed are not re-emitted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5331) MirrorMaker can't guarantee data is consistent between clusters

2018-10-25 Thread Ryanne Dolan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5331?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664580#comment-16664580
 ] 

Ryanne Dolan commented on KAFKA-5331:
-

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] 
(under discussion) aims to solve this problem.

> MirrorMaker can't guarantee data is consistent between clusters
> ---
>
> Key: KAFKA-5331
> URL: https://issues.apache.org/jira/browse/KAFKA-5331
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, tools
>Affects Versions: 0.10.2.1
>Reporter: xuzq
>Priority: Critical
> Attachments: KAFKA-5331.patch
>
>
> MirrorMaker can't guarantee data is consistent between clusters although the 
> topic partition Numbers are equal.
> If we want to copy data from the old cluster to the new cluster, at the same 
> time guaranteeing the data consistency, we have no choice but to code a 
> MirrorMakerMessageHandler type of class manually to achieve it.
> I think we can add a parameter to say whether or not you want to ensure the 
> order, such as data.consistency and default value is true.
> Of course, you can set data.consistency = false, if you don't want the data 
> in order between clusters or some topic partition Numbers are not equal 
> between clusters.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3991) MirrorMaker: allow custom publisher

2018-10-25 Thread Ryanne Dolan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664573#comment-16664573
 ] 

Ryanne Dolan commented on KAFKA-3991:
-

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] 
(under discussion) would address this issue via custom SinkConnectors.

> MirrorMaker: allow custom publisher
> ---
>
> Key: KAFKA-3991
> URL: https://issues.apache.org/jira/browse/KAFKA-3991
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Affects Versions: 0.9.0.1
>Reporter: Hang Sun
>Priority: Minor
>  Labels: features
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> It would be nice if MirrorMaker allows custom publisher to be plugged in.  
> The use case we have is to use a customized REST publisher with Kafka REST 
> Proxy (http://docs.confluent.io/2.0.0/kafka-rest/docs/index.html) to mirror 
> data cross data center via public internet.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664502#comment-16664502
 ] 

ASF GitHub Bot commented on KAFKA-7548:
---

MayureshGharat opened a new pull request #5844: KAFKA-7548 : KafkaConsumer 
should not throw away already fetched data for paused partitions.
URL: https://github.com/apache/kafka/pull/5844
 
 
   In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
TopicPartitions, it seems that in next call to poll we remove the 
completedFetches for those paused TopicPartitions. Normally, if an application 
is calling pause on a TopicPartition, it is likely to return to that 
TopicPartition in near future and when it does, with the current design we 
would have to re-fetch that data.
   
   The current patch does not throw away the already buffered data for paused 
partitions. It also makes sure that if we have buffered data already for a 
recently unpaused partition, it will not send out another fetch request for 
that TopicPartition until that buffered data has been processed. This 
guarantees that we would have at the most 1 completedFetch buffered for a 
paused TopicPartition.
   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> ---
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
> brokers that is buffered in completedFetch queue. Now if we pause a few 
> partitions, it seems that in next call to poll we remove the completedFetches 
> for those paused partitions. Normally, if an application is calling pause on 
> topicPartitions, it is likely to return to those topicPartitions in near 
> future and when it does, with the current design we would have to re-fetch 
> that data.
> At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
> would improve the performance for stream applications like Samza. We ran a 
> benchmark were we compared what is the throughput w.r.t to different values 
> of maxPollRecords.
> We had a consumer subscribed to 10 partitions of a high volume topic and 
> paused different number of partitions for every poll call. Here are the 
> results :
>  
>  
> *Before fix (records consumed)*
> |maxPollRecords->
> Number of Partitions
> Paused
> \|
> V|10|5|1|
> |0|8605320
> (60.022276059 sec)|8337690
> (60.026690095 sec)|6424753
> (60.67003 sec)|
> |2|101910
> (60.006989628 sec)|49350
> (60.022598668 sec)|10495
> (60.020077555 sec)|
> |4|48420
> (60.022096537 sec)|24850
> (60.007451162 sec)|5004
> (60.009773507 sec) |
> |6|30420
> (60.018380086 sec)|15385
> (60.011912135 sec)|3152
> (60.013573487 sec)|
> |8|23390
> (60.043122495 sec)|11390
> (60.013297496 sec)|2237
> (60.038921333 sec)|
> |9|20230 (60.026183204 sec)|10355
> (60.015584914 sec)|2087
> (60.00319069 sec)|
>  
>  
>  
> *After fix (records consumed)*
> |Number of Partitions
>  Paused / maxPollRecords|10|5|1|
> |0|8662740 (60.011527576 sec)|8203445
>  (60.022204036 sec)|5846512
>  (60.0168916 sec)|
> |2|8257390
>  (60.011121061 sec)|7776150
>  (60.01620875 sec)|5269557
>  (60.022581248 sec)|
> |4|7938510
>  (60.011829002 sec)|7510140
>  (60.017571391 sec)|5213496
>  (60.000230139 sec)|
> |6|7100970
>  (60.007220465 sec)|6382845
>  (60.038580526 sec)|4519645
>  (60.48034 sec)|
> |8|6799956 (60.001850171 sec)|6482421
>  (60.001997219 sec)|4383300 (60.4836 sec)|
> |9|7045177 (60.035366096 sec)|6465839 
>  (60.41961 sec)|4884693
>  (60.42054 sec)|



--
This message was sent by Atlassian JIRA

[jira] [Commented] (KAFKA-6963) KIP-310: Add a Kafka Source Connector to Kafka Connect

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664471#comment-16664471
 ] 

ASF GitHub Bot commented on KAFKA-6963:
---

rhysmccaig closed pull request #5438: KAFKA-6963: KIP-310: Add a Kafka Source 
Connector to Kafka Connect
URL: https://github.com/apache/kafka/pull/5438
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/build.gradle b/build.gradle
index 3e8558d4739..f883eb54658 100644
--- a/build.gradle
+++ b/build.gradle
@@ -516,7 +516,7 @@ for ( sv in availableScalaVersions ) {
   }
 }
 
-def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 
'connect:json', 'connect:file', 'connect:basic-auth-extension']
+def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 
'connect:json', 'connect:file', 'connect:basic-auth-extension', 'connect:kafka' 
]
 def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 
'streams:streams-scala', 'streams:test-utils', 'streams:examples'] + connectPkgs
 
 /** Create one task per default Scala version */
@@ -742,6 +742,8 @@ project(':core') {
 from(project(':connect:file').configurations.runtime) { into("libs/") }
 from(project(':connect:basic-auth-extension').jar) { into("libs/") }
 from(project(':connect:basic-auth-extension').configurations.runtime) { 
into("libs/") }
+from(project(':connect:kafka').jar) { into("libs/") }
+from(project(':connect:kafka').configurations.runtime) { into("libs/") }
 from(project(':streams').jar) { into("libs/") }
 from(project(':streams').configurations.runtime) { into("libs/") }
 from(project(':streams:streams-scala').jar) { into("libs/") }
@@ -1501,6 +1503,44 @@ project(':connect:basic-auth-extension') {
   }
 }
 
+project(':connect:kafka') {
+  archivesBaseName = "connect-kafka"
+
+  dependencies {
+compile project(':connect:api')
+compile project(':clients')
+compile libs.slf4jApi
+
+testCompile libs.bcpkix
+testCompile libs.easymock
+testCompile libs.junit
+testCompile libs.powermockJunit4
+testCompile libs.powermockEasymock
+testCompile project(':clients').sourceSets.test.output
+  }
+
+  javadoc {
+enabled = false
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+from (configurations.testRuntime) {
+  include('slf4j-log4j12*')
+  include('log4j*jar')
+}
+from (configurations.runtime) {
+  exclude('kafka-clients*')
+  exclude('connect-*')
+}
+into "$buildDir/dependant-libs"
+duplicatesStrategy 'exclude'
+  }
+
+  jar {
+dependsOn copyDependantLibs
+  }
+}
+
 task aggregatedJavadoc(type: Javadoc) {
   def projectsWithJavadoc = subprojects.findAll { it.javadoc.enabled }
   source = projectsWithJavadoc.collect { it.sourceSets.main.allJava }
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 35f42e3a700..d23bc2bb8bc 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -370,6 +370,15 @@
   
 
 
+
+  
+  
+  
+  
+  
+
+
+
 
   
   
diff --git a/config/connect-kafka-source.properties 
b/config/connect-kafka-source.properties
new file mode 100644
index 000..7564478e447
--- /dev/null
+++ b/config/connect-kafka-source.properties
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name=kafka-source
+tasks.max=1
+key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
+value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
+connector.class=org.apache.kafka.connect.kafka.KafkaSourceConnector
+source.bootstrap.server=kafka.bootstrap.server1:9092,kafka.bootstrap.server2:9093
+source.topic.whitelist=test.topic.*
+source.auto.offset.reset=earliest
+source.group.id=kafka-connect-testing
+destination.topics.prefix=aggregate.
diff --git 
a/connect/kafka/src/main/java/org/apache/kafka/connect/kafka/KafkaSourceConnector.java
 

[jira] [Commented] (KAFKA-7420) Global stores should be guarded as read-only for regular tasks

2018-10-25 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664348#comment-16664348
 ] 

Matthias J. Sax commented on KAFKA-7420:


Kafka supports two types of stores that can be added via 
`Topology#addStateStore()` and `Topology#addGlobalStateStore()`. Global stores 
are maintained by `GlobalStreamThread` and only this thread is allowed to write 
into global stores.

However, any `Processor` (that are executed by `StreamThread`) can get access 
to all global stores during runtime via `ProcessorContext#getStateStore(...)` 
what allow those thread to also write into the store. This is implemented in 
`ProcessorContextImpl`. I think, for a fix it would be sufficient to wrap any 
"global store" that is returned from `stateManager.getGlobalStore()` with a 
ReadOnlyStore – at least for the build-in stores. For custom stores, that 
should be rare, we don't have enough information to guard against write access.

Does this help?

> Global stores should be guarded as read-only for regular tasks
> --
>
> Key: KAFKA-7420
> URL: https://issues.apache.org/jira/browse/KAFKA-7420
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: newbie++
>
> Global stores should only be update by the global thread. Any other task, 
> should only read from a global store. However, when getting a reference to a 
> global store, all tasks have full read/write access to the store.
> We should put a guard in place and only return either _(a)_ a read-only 
> store, or _(b)_ wrap the store but throw an exception on write for regular 
> tasks.
> While the read-only store idea might be cleaner from an API point of view, we 
> should consider the second approach for 2 reasons: (1) it's backwards 
> compatible (of course, code might fail at runtime, but this seems to be ok, 
> as it indicates a bug in the user code anyway) (2) with regard to 
> [KIP-358|https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times],
>  we should have the more runtime efficient methods at this level (currently, 
> global stores are only key-value stores and this argument falls a little 
> short though—however, it might be a good idea to stay future proof; at least, 
> we should discuss it).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7554) zookeeper.session.timeout.ms Value

2018-10-25 Thread BELUGA BEHR (JIRA)
BELUGA BEHR created KAFKA-7554:
--

 Summary: zookeeper.session.timeout.ms Value
 Key: KAFKA-7554
 URL: https://issues.apache.org/jira/browse/KAFKA-7554
 Project: Kafka
  Issue Type: Improvement
  Components: zkclient
Reporter: BELUGA BEHR


{quote}
zookeeper.session.timeout.ms = 6000 (6s)
zookeeper.connection.timeout.ms = 6000 (6s)
{quote}
- https://kafka.apache.org/documentation/#configuration

Kind of an odd value?  Was it supposed to be 6 (60s) ?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7553) Jenkins PR tests hung

2018-10-25 Thread John Roesler (JIRA)
John Roesler created KAFKA-7553:
---

 Summary: Jenkins PR tests hung
 Key: KAFKA-7553
 URL: https://issues.apache.org/jira/browse/KAFKA-7553
 Project: Kafka
  Issue Type: Bug
Reporter: John Roesler
 Attachments: consoleText.txt

I wouldn't worry about this unless it continues to happen, but I wanted to 
document it.

This was a Java 11 build: 
[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/266/]

It was for this PR: [https://github.com/apache/kafka/pull/5795]

And this commit: 
[https://github.com/apache/kafka/pull/5795/commits/5bdcd0e023c6f406d585155399f6541bb6a9f9c2]

 

It looks like the tests just hung after 46 minutes, until the build timed out 
at 180 minutes.

End of the output:
{noformat}
...
00:46:27.275 kafka.server.ServerGenerateBrokerIdTest > 
testConsistentBrokerIdFromUserConfigAndMetaProps STARTED
00:46:29.775 
00:46:29.775 kafka.server.ServerGenerateBrokerIdTest > 
testConsistentBrokerIdFromUserConfigAndMetaProps PASSED
03:00:51.124 Build timed out (after 180 minutes). Marking the build as aborted.
03:00:51.440 Build was aborted
03:00:51.492 [FINDBUGS] Skipping publisher since build result is ABORTED
03:00:51.492 Recording test results
03:00:51.495 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:58.017 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:59.330 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:59.331 Adding one-line test results to commit status...
03:00:59.332 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:59.334 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:59.335 Setting status of 5bdcd0e023c6f406d585155399f6541bb6a9f9c2 to 
FAILURE with url https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/266/ 
and message: 'FAILURE
03:00:59.335  9053 tests run, 1 skipped, 0 failed.'
03:00:59.335 Using context: JDK 11 and Scala 2.12
03:00:59.541 Setting GRADLE_4_8_1_HOME=/home/jenkins/tools/gradle/4.8.1
03:00:59.542 Finished: ABORTED{noformat}
 

I did find one test that started but did not finish:
{noformat}
00:23:29.576 kafka.api.PlaintextConsumerTest > 
testLowMaxFetchSizeForRequestAndPartition STARTED
{noformat}
But note that the tests continued to run for another 23 minutes after this one 
started.

 

Just for completeness, there were 4 failures:
{noformat}
00:22:06.875 kafka.admin.ResetConsumerGroupOffsetTest > 
testResetOffsetsNotExistingGroup FAILED
00:22:06.875 java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.
00:22:06.875 at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
00:22:06.875 at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
00:22:06.875 at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
00:22:06.876 at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:262)
00:22:06.876 at 
kafka.admin.ConsumerGroupCommand$ConsumerGroupService.resetOffsets(ConsumerGroupCommand.scala:307)
00:22:06.876 at 
kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsNotExistingGroup(ResetConsumerGroupOffsetTest.scala:89)
00:22:06.876 
00:22:06.876 Caused by:
00:22:06.876 
org.apache.kafka.common.errors.CoordinatorNotAvailableException: The 
coordinator is not available.{noformat}
 
{noformat}
00:25:22.175 kafka.api.CustomQuotaCallbackTest > testCustomQuotaCallback FAILED
00:25:22.175 java.lang.AssertionError: Partition [group1_largeTopic,69] 
metadata not propagated after 15000 ms
00:25:22.176 at kafka.utils.TestUtils$.fail(TestUtils.scala:351)
00:25:22.176 at 
kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:741)
00:25:22.176 at 
kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:831)
00:25:22.176 at 
kafka.utils.TestUtils$$anonfun$createTopic$2.apply(TestUtils.scala:330)
00:25:22.176 at 
kafka.utils.TestUtils$$anonfun$createTopic$2.apply(TestUtils.scala:329)
00:25:22.176 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
00:25:22.176 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
00:25:22.176 at 
scala.collection.Iterator$class.foreach(Iterator.scala:891)
00:25:22.176 at 
scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
00:25:22.176 at 
scala.collection.MapLike$DefaultKeySet.foreach(MapLike.scala:174)
00:25:22.176 at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
00:25:22.176 at 
scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
00:25:22.176 at scala.collection.SetLike$class.map(SetLike.scala:92)

[jira] [Created] (KAFKA-7552) StatefulProcessorNode tries to connect state store to processor before it is added

2018-10-25 Thread Adam Bellemare (JIRA)
Adam Bellemare created KAFKA-7552:
-

 Summary: StatefulProcessorNode tries to connect state store to 
processor before it is added
 Key: KAFKA-7552
 URL: https://issues.apache.org/jira/browse/KAFKA-7552
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.0.0, 2.1.0
Reporter: Adam Bellemare


StatefulProcessorNode tries to "connectProcessorAndStateStores" before 
"addStateStore" is called on the state store. This throws an exception. Current 
implementations of Kafka Streams do not appear to test for this, nor do any of 
the kafka streams applications use it. Discovered while looking to use the node 
for another ticket.

[https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StatefulProcessorNode.java#L86]

 

Results in "org.apache.kafka.streams.errors.TopologyException: Invalid 
topology: StateStore  is not added yet."



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6890) Add connector level configurability for producer/consumer client configs

2018-10-25 Thread Jordan Moore (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6890?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664027#comment-16664027
 ] 

Jordan Moore commented on KAFKA-6890:
-

[~rhauch], nice to meet you at Kafka Summit, 
When we spoke, I had mentioned that we currently have one large connect 
cluster, primarily for S3 Connect, in each of our development environments, and 
for some of the 100+ connector configurations that we have loaded, the topics 
they are sinking have variable amounts of throughput, so I think it would be 
beneficial to be able to tune at least some of the properties for those APIs. 

I know you brought up the concerns about at least being able to set 
"consumer.bootstrap.servers", for example, so perhaps setting 
"bootstrap.servers" (and other, similar properties, such as SSL certs for those 
servers) could be blacklisted in some way?

Also, as I mentioned on GitHub, this looks like it might duplicate KAFKA-4159 

> Add connector level configurability for producer/consumer client configs
> 
>
> Key: KAFKA-6890
> URL: https://issues.apache.org/jira/browse/KAFKA-6890
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Allen Tang
>Priority: Minor
>
> Right now, each source connector and sink connector inherit their client 
> configurations from the worker properties. Within the worker properties, all 
> configurations that have a prefix of "producer." or "consumer." are applied 
> to all source connectors and sink connectors respectively.
> We should also provide connector-level overrides whereby connector properties 
> that are prefixed with "producer." and "consumer." are used to feed into the 
> producer and consumer clients embedded within source and sink connectors 
> respectively. The prefixes will be removed via a String#substring() call, and 
> the remainder of the connector property key will be used as the client 
> configuration key. The value is fed directly to the client as the 
> configuration value.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7551) Refactor to create both producer & consumer in Worker

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7551?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16664021#comment-16664021
 ] 

ASF GitHub Bot commented on KAFKA-7551:
---

mageshn opened a new pull request #5842: KAFKA-7551:Refactor to create producer 
& consumer in the worker.
URL: https://github.com/apache/kafka/pull/5842
 
 
   This is minor refactoring that brings in the creation of producer and 
consumer to the Worker. Currently, the consumer is created in the 
WorkerSinkTask. This should not affect any functionality and it just makes the 
code structure easier to understand.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Refactor to create both producer & consumer in Worker
> -
>
> Key: KAFKA-7551
> URL: https://issues.apache.org/jira/browse/KAFKA-7551
> Project: Kafka
>  Issue Type: Task
>  Components: KafkaConnect
>Reporter: Magesh kumar Nandakumar
>Assignee: Magesh kumar Nandakumar
>Priority: Minor
> Fix For: 2.2.0
>
>
> In distributed mode,  the producer is created in the Worker and the consumer 
> is created in the WorkerSinkTask. The proposal is to refactor it so that both 
> of them are created in Worker. This will not affect any functionality and is 
> just a refactoring to make the code consistent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7551) Refactor to create both producer & consumer in Worker

2018-10-25 Thread Magesh kumar Nandakumar (JIRA)
Magesh kumar Nandakumar created KAFKA-7551:
--

 Summary: Refactor to create both producer & consumer in Worker
 Key: KAFKA-7551
 URL: https://issues.apache.org/jira/browse/KAFKA-7551
 Project: Kafka
  Issue Type: Task
  Components: KafkaConnect
Reporter: Magesh kumar Nandakumar
Assignee: Magesh kumar Nandakumar
 Fix For: 2.2.0


In distributed mode,  the producer is created in the Worker and the consumer is 
created in the WorkerSinkTask. The proposal is to refactor it so that both of 
them are created in Worker. This will not affect any functionality and is just 
a refactoring to make the code consistent.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7535) KafkaConsumer doesn't report records-lag if isolation.level is read_committed

2018-10-25 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7535?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar updated KAFKA-7535:
-
Fix Version/s: (was: 2.0.2)
   (was: 2.1.1)
   2.0.1

> KafkaConsumer doesn't report records-lag if isolation.level is read_committed
> -
>
> Key: KAFKA-7535
> URL: https://issues.apache.org/jira/browse/KAFKA-7535
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Alexey Vakhrenev
>Assignee: lambdaliu
>Priority: Major
>  Labels: regression
> Fix For: 2.0.1, 2.1.0
>
>
> Starting from 2.0.0, {{KafkaConsumer}} doesn't report {{records-lag}} if 
> {{isolation.level}} is {{read_committed}}. The last version, where it works 
> is {{1.1.1}}.
> The issue can be easily reproduced in {{kafka.api.PlaintextConsumerTest}} by 
> adding {{consumerConfig.setProperty("isolation.level", "read_committed")}} 
> witin related tests:
>  - {{testPerPartitionLagMetricsCleanUpWithAssign}}
>  - {{testPerPartitionLagMetricsCleanUpWithSubscribe}}
>  - {{testPerPartitionLagWithMaxPollRecords}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7531) NPE NullPointerException at TransactionCoordinator handleEndTransaction

2018-10-25 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7531?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663927#comment-16663927
 ] 

Ismael Juma commented on KAFKA-7531:


cc [~hachikuji] in case he has thoughts.

> NPE NullPointerException at TransactionCoordinator handleEndTransaction
> ---
>
> Key: KAFKA-7531
> URL: https://issues.apache.org/jira/browse/KAFKA-7531
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Sebastian Puzoń
>Priority: Critical
> Fix For: 2.1.1, 2.0.2
>
>
> Kafka cluster with 4 brokers, 1 topic (20 partitions), 1 zookeeper.
> Streams Application 4 instances, each has 5 Streams threads, total 20 stream 
> threads.
> I observe NPE NullPointerException at coordinator broker which causes all 
> application stream threads shutdown, here's stack from broker:
> {code:java}
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Member 
> elog_agg-client-sswvlp6802-StreamThread-4-consumer-cbcd4704-a346-45ea-80f9-96f62fc2dabe
>  in group elo
> g_agg has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:50:46,755] INFO [GroupCoordinator 2]: Preparing to rebalance 
> group elog_agg with old generation 49 (__consumer_offsets-21) 
> (kafka.coordinator.gro
> up.GroupCoordinator)
> [2018-10-22 21:51:17,519] INFO [GroupCoordinator 2]: Stabilized group 
> elog_agg generation 50 (__consumer_offsets-21) 
> (kafka.coordinator.group.GroupCoordinator)
> [2018-10-22 21:51:17,524] INFO [GroupCoordinator 2]: Assignment received from 
> leader for group elog_agg for generation 50 
> (kafka.coordinator.group.GroupCoordina
> tor)
> [2018-10-22 21:51:27,596] INFO [TransactionCoordinator id=2] Initialized 
> transactionalId elog_agg-0_14 with producerId 1001 and producer epoch 20 on 
> partition _
> _transaction_state-16 (kafka.coordinator.transaction.TransactionCoordinator)
> [
> [2018-10-22 21:52:00,920] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_3,producer_id=1004,producer_epoch=16,transaction_result=tr
> ue} (kafka.server.KafkaApis)
> java.lang.NullPointerException
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$7(TransactionCoordinator.scala:393)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at 
> kafka.coordinator.transaction.TransactionMetadata.inLock(TransactionMetadata.scala:172)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$5(TransactionCoordinator.scala:383)
>  at scala.util.Either$RightProjection.flatMap(Either.scala:702)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.sendTxnMarkersCallback$1(TransactionCoordinator.scala:372)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.$anonfun$handleEndTransaction$12$adapted(TransactionCoordinator.scala:437)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.updateCacheCallback$1(TransactionStateManager.scala:581)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15(TransactionStateManager.scala:619)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$15$adapted(TransactionStateManager.scala:619)
>  at kafka.server.DelayedProduce.onComplete(DelayedProduce.scala:129)
>  at kafka.server.DelayedOperation.forceComplete(DelayedOperation.scala:70)
>  at kafka.server.DelayedProduce.tryComplete(DelayedProduce.scala:110)
>  at 
> kafka.server.DelayedOperationPurgatory.tryCompleteElseWatch(DelayedOperation.scala:232)
>  at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:495)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.$anonfun$appendTransactionToLog$13(TransactionStateManager.scala:613)
>  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>  at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:251)
>  at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:257)
>  at 
> kafka.coordinator.transaction.TransactionStateManager.appendTransactionToLog(TransactionStateManager.scala:590)
>  at 
> kafka.coordinator.transaction.TransactionCoordinator.handleEndTransaction(TransactionCoordinator.scala:437)
>  at kafka.server.KafkaApis.handleEndTxnRequest(KafkaApis.scala:1653)
>  at kafka.server.KafkaApis.handle(KafkaApis.scala:132)
>  at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:69)
>  at java.lang.Thread.run(Thread.java:745)
> [2018-10-22 21:52:15,958] ERROR [KafkaApi-2] Error when handling request 
> {transactional_id=elog_agg-0_9,producer_id=1005,producer_epoch=8,transaction_result=true}
>  

[jira] [Commented] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-10-25 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663865#comment-16663865
 ] 

Lee Dongjin commented on KAFKA-7549:


[~ijuma] No problem. Let me have a look.

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Priority: Major
> Fix For: 2.1.0
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-10-25 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7549:
--

Assignee: Lee Dongjin

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.1.0
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7420) Global stores should be guarded as read-only for regular tasks

2018-10-25 Thread Nikolay Izhikov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7420?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663782#comment-16663782
 ] 

Nikolay Izhikov commented on KAFKA-7420:


Hello, [~mjsax]

Can you clarify the ticket description, please?
What global store are you written about?
Can you provide method name and interface I should start with?

> Global stores should be guarded as read-only for regular tasks
> --
>
> Key: KAFKA-7420
> URL: https://issues.apache.org/jira/browse/KAFKA-7420
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Nikolay Izhikov
>Priority: Minor
>  Labels: newbie++
>
> Global stores should only be update by the global thread. Any other task, 
> should only read from a global store. However, when getting a reference to a 
> global store, all tasks have full read/write access to the store.
> We should put a guard in place and only return either _(a)_ a read-only 
> store, or _(b)_ wrap the store but throw an exception on write for regular 
> tasks.
> While the read-only store idea might be cleaner from an API point of view, we 
> should consider the second approach for 2 reasons: (1) it's backwards 
> compatible (of course, code might fail at runtime, but this seems to be ok, 
> as it indicates a bug in the user code anyway) (2) with regard to 
> [KIP-358|https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times],
>  we should have the more runtime efficient methods at this level (currently, 
> global stores are only key-value stores and this argument falls a little 
> short though—however, it might be a good idea to stay future proof; at least, 
> we should discuss it).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-395) kafka.tools.MirrorMaker black/white list improvement

2018-10-25 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663721#comment-16663721
 ] 

Sönke Liebau edited comment on KAFKA-395 at 10/25/18 1:14 PM:
--

As this issue has not been touched in more than 6 years I think it is fairly 
safe to assume that we can close this.

Any discussions around mirroring functionality are better addressed in the 
MirrorMaker 2.0 KIP discussion.

Regarding the specifics of this issue, that can be worked around by placing 
whitelists topics into a file and the paste-ing that file into the command as 
shown below. I believe that this should be sufficient as a workaround.
{code:bash}
./kafka-mirror-maker.sh --consumer.config ../config/consumer.properties 
--producer.config ../config/producer.properties --whitelist "$(paste 
whitelist.topics -d'|' -s)" --blacklist "$(paste blacklist.topics -d'|' -s)"
{code}


was (Author: sliebau):
As this issue has not been touched in more than 6 years I think it is fairly 
safe to assume that we can close this.

Any discussions around mirroring functionality are better addressed in the 
MirrorMaker 2.0 KIP discussion.

Regarding the specifics of this issue, that can be worked around by placing 
whitelists topics into a file and the paste-ing that file into the command as 
shown below. I believe that this should be sufficient as a workaround.
{code:java}
// ./kafka-mirror-maker.sh --consumer.config ../config/consumer.properties 
--producer.config ../config/producer.properties --whitelist "$(paste 
whitelist.topics -d'|' -s)" --blacklist "$(paste blacklist.topics -d'|' -s)"
{code}

> kafka.tools.MirrorMaker black/white list improvement
> 
>
> Key: KAFKA-395
> URL: https://issues.apache.org/jira/browse/KAFKA-395
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: Dave DeMaagd
>Priority: Minor
>
> Current black/white list topics are specified directly on the command line, 
> while functional, this has two drawbacks:
> 1) Changes become unwieldy if there are a large number of running instances - 
> potentially many instances to restart, which can have implications for data 
> stream lag
> 2) Maintaining the list itself can become increasingly complex if there are a 
> large number of elements in the list (particularly if they are complex 
> expressions)
> Suggest extending the way that black/white lists can be fed to the mirror 
> maker application, in particular, being able to specify the black/white list 
> as a file (or possibly a generic URI).  Thinking that this could be 
> accomplished either by adding '--whitelistfile' and '--blacklistfile' command 
> line parameters, or modifying the existing '--blacklist' and '--whitelist' 
> parameters to include a 'is this a valid file?' test and decide how to handle 
> it based on that (if it is a file, read it, if not, use current behavior). 
> Follow up suggestion would be to have the mirror maker process check for 
> updates to the list file, and on change, validate and reload it, and run from 
> that point with the new list information. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-10-25 Thread Ismael Juma (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663722#comment-16663722
 ] 

Ismael Juma commented on KAFKA-7549:


[~dongjin] would you have time to look into this?

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Priority: Major
> Fix For: 2.1.0
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-395) kafka.tools.MirrorMaker black/white list improvement

2018-10-25 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/KAFKA-395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sönke Liebau resolved KAFKA-395.

Resolution: Workaround

As this issue has not been touched in more than 6 years I think it is fairly 
safe to assume that we can close this.

Any discussions around mirroring functionality are better addressed in the 
MirrorMaker 2.0 KIP discussion.

Regarding the specifics of this issue, that can be worked around by placing 
whitelists topics into a file and the paste-ing that file into the command as 
shown below. I believe that this should be sufficient as a workaround.
{code:java}
// ./kafka-mirror-maker.sh --consumer.config ../config/consumer.properties 
--producer.config ../config/producer.properties --whitelist "$(paste 
whitelist.topics -d'|' -s)" --blacklist "$(paste blacklist.topics -d'|' -s)"
{code}

> kafka.tools.MirrorMaker black/white list improvement
> 
>
> Key: KAFKA-395
> URL: https://issues.apache.org/jira/browse/KAFKA-395
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: Dave DeMaagd
>Priority: Minor
>
> Current black/white list topics are specified directly on the command line, 
> while functional, this has two drawbacks:
> 1) Changes become unwieldy if there are a large number of running instances - 
> potentially many instances to restart, which can have implications for data 
> stream lag
> 2) Maintaining the list itself can become increasingly complex if there are a 
> large number of elements in the list (particularly if they are complex 
> expressions)
> Suggest extending the way that black/white lists can be fed to the mirror 
> maker application, in particular, being able to specify the black/white list 
> as a file (or possibly a generic URI).  Thinking that this could be 
> accomplished either by adding '--whitelistfile' and '--blacklistfile' command 
> line parameters, or modifying the existing '--blacklist' and '--whitelist' 
> parameters to include a 'is this a valid file?' test and decide how to handle 
> it based on that (if it is a file, read it, if not, use current behavior). 
> Follow up suggestion would be to have the mirror maker process check for 
> updates to the list file, and on change, validate and reload it, and run from 
> that point with the new list information. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-10-25 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-7549:
---
Fix Version/s: 2.1.0

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Priority: Major
> Fix For: 2.1.0
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-5462) Add a configuration for users to specify a template for building a custom principal name

2018-10-25 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-5462.
--
   Resolution: Fixed
Fix Version/s: 2.2.0

> Add a configuration for users to specify a template for building a custom 
> principal name
> 
>
> Key: KAFKA-5462
> URL: https://issues.apache.org/jira/browse/KAFKA-5462
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.10.2.1
>Reporter: Koelli Mungee
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.2.0
>
>
> Add a configuration for users to specify a template for building a custom 
> principal name.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5462) Add a configuration for users to specify a template for building a custom principal name

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663625#comment-16663625
 ] 

ASF GitHub Bot commented on KAFKA-5462:
---

omkreddy closed pull request #5684: KAFKA-5462: Add configuration to build 
custom SSL principal name (KIP-371)
URL: https://github.com/apache/kafka/pull/5684
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
index a29d8069b99..e3a8a774a51 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/config/internals/BrokerSecurityConfigs.java
@@ -34,17 +34,28 @@
 public static final String SSL_CLIENT_AUTH_CONFIG = "ssl.client.auth";
 public static final String SASL_ENABLED_MECHANISMS_CONFIG = 
"sasl.enabled.mechanisms";
 public static final String SASL_SERVER_CALLBACK_HANDLER_CLASS = 
"sasl.server.callback.handler.class";
+public static final String SSL_PRINCIPAL_MAPPING_RULES_CONFIG = 
"ssl.principal.mapping.rules";
 
 public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully 
qualified name of a class that implements the " +
 "KafkaPrincipalBuilder interface, which is used to build the 
KafkaPrincipal object used during " +
 "authorization. This config also supports the deprecated 
PrincipalBuilder interface which was previously " +
 "used for client authentication over SSL. If no principal builder 
is defined, the default behavior depends " +
-"on the security protocol in use. For SSL authentication, the 
principal name will be the distinguished " +
+"on the security protocol in use. For SSL authentication,  the 
principal will be derived using the" +
+" rules defined by " + SSL_PRINCIPAL_MAPPING_RULES_CONFIG + 
" applied on the distinguished " +
 "name from the client certificate if one is provided; otherwise, 
if client authentication is not required, " +
 "the principal name will be ANONYMOUS. For SASL authentication, 
the principal will be derived using the " +
 "rules defined by " + 
SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG + " if GSSAPI is in use, " 
+
 "and the SASL authentication ID for other mechanisms. For 
PLAINTEXT, the principal will be ANONYMOUS.";
 
+public static final String SSL_PRINCIPAL_MAPPING_RULES_DOC = "A list of 
rules for mapping from distinguished name" +
+" from the client certificate to short name. The rules are 
evaluated in order and the first rule that matches" +
+" a principal name is used to map it to a short name. Any later 
rules in the list are ignored. By default," +
+" distinguished name of the X.500 certificate will be the 
principal. For more details on the format please" +
+" see  security authorization and 
acls. Note that this configuration is ignored" +
+" if an extension of KafkaPrincipalBuilder is provided by the 
" + PRINCIPAL_BUILDER_CLASS_CONFIG + "" +
+   " configuration.";
+public static final List DEFAULT_SSL_PRINCIPAL_MAPPING_RULES = 
Collections.singletonList("DEFAULT");
+
 public static final String SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC = "A 
list of rules for mapping from principal " +
 "names to short names (typically operating system usernames). The 
rules are evaluated in order and the " +
 "first rule that matches a principal name is used to map it to a 
short name. Any later rules in the list are " +
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java 
b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 10779b7881f..b3040f3ef73 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -26,6 +26,7 @@
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
+import org.apache.kafka.common.security.ssl.SslPrincipalMapper;
 import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
 import org.apache.kafka.common.utils.Utils;
 
@@ -163,12 +164,13 @@ private static void requireNonNullMode(Mode mode, 
SecurityProtocol securityProto
 public static KafkaPrincipalBuilder 

[jira] [Commented] (KAFKA-7535) KafkaConsumer doesn't report records-lag if isolation.level is read_committed

2018-10-25 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663595#comment-16663595
 ] 

Manikumar commented on KAFKA-7535:
--

As mentioned in the JIRA description, this only impacts consumer records-lag 
metric. We can include the fix to 2.0.1 .

> KafkaConsumer doesn't report records-lag if isolation.level is read_committed
> -
>
> Key: KAFKA-7535
> URL: https://issues.apache.org/jira/browse/KAFKA-7535
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.0.0
>Reporter: Alexey Vakhrenev
>Assignee: lambdaliu
>Priority: Major
>  Labels: regression
> Fix For: 2.1.0, 2.1.1, 2.0.2
>
>
> Starting from 2.0.0, {{KafkaConsumer}} doesn't report {{records-lag}} if 
> {{isolation.level}} is {{read_committed}}. The last version, where it works 
> is {{1.1.1}}.
> The issue can be easily reproduced in {{kafka.api.PlaintextConsumerTest}} by 
> adding {{consumerConfig.setProperty("isolation.level", "read_committed")}} 
> witin related tests:
>  - {{testPerPartitionLagMetricsCleanUpWithAssign}}
>  - {{testPerPartitionLagMetricsCleanUpWithSubscribe}}
>  - {{testPerPartitionLagWithMaxPollRecords}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7550) 0-byte log segments on topics with log compaction

2018-10-25 Thread Jakub Korab (JIRA)
Jakub Korab created KAFKA-7550:
--

 Summary: 0-byte log segments on topics with log compaction
 Key: KAFKA-7550
 URL: https://issues.apache.org/jira/browse/KAFKA-7550
 Project: Kafka
  Issue Type: Bug
  Components: log
Affects Versions: 1.0.0
Reporter: Jakub Korab


In production, I am seeing some old log segments that are 0-bytes in length. 
There are no associated .index or .timeindex files.  These topics have log 
compaction turned on.

The segments are not creating any issues.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-10-25 Thread Magnus Edenhill (JIRA)
Magnus Edenhill created KAFKA-7549:
--

 Summary: Old ProduceRequest with zstd compression does not return 
error to client
 Key: KAFKA-7549
 URL: https://issues.apache.org/jira/browse/KAFKA-7549
 Project: Kafka
  Issue Type: Bug
  Components: compression
Reporter: Magnus Edenhill


Kafka broker v2.1.0rc0.

 

KIP-110 states that:

"Zstd will only be allowed for the bumped produce API. That is, for older 
version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
regardless of the message format."

 

However, sending a ProduceRequest V3 with zstd compression (which is a client 
side bug) closes the connection with the following exception rather than 
returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:

 
{noformat}
[2018-10-25 11:40:31,813] ERROR Exception while processing request from 
127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
for apiKey: PRODUCE, apiVersion: 3, connectionId: 
127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
principal: User:ANONYMOUS
Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
requests with version 3 are note allowed to use ZStandard compression
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7546) Java implementation for Authorizer

2018-10-25 Thread Manikumar (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663494#comment-16663494
 ] 

Manikumar commented on KAFKA-7546:
--

You can check this implementaion: 
https://github.com/apache/ranger/blob/master/plugin-kafka/src/main/java/org/apache/ranger/authorization/kafka/authorizer/RangerKafkaAuthorizer.java

> Java implementation for Authorizer
> --
>
> Key: KAFKA-7546
> URL: https://issues.apache.org/jira/browse/KAFKA-7546
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Pradeep Bansal
>Priority: Major
> Attachments: AuthorizerImpl.PNG
>
>
> I am using kafka with authentication and authorization. I wanted to plugin my 
> own implementation of Authorizer which doesn't use zookeeper instead has 
> permission mapping in SQL database. Is it possible to write Authorizer code 
> in Java?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7546) Java implementation for Authorizer

2018-10-25 Thread Pradeep Bansal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663483#comment-16663483
 ] 

Pradeep Bansal commented on KAFKA-7546:
---

Thanks [~mgharat] for the insights. I have not worked with scala before, 
[Authorizer|https://github.com/apache/kafka/blob/3cdc78e6bb1f83973a14ce1550fe3874f7348b05/core/src/main/scala/kafka/security/auth/Authorizer.scala]
 interface is in scala. Will I be able to write Java class for this interface 
like below?

 

!AuthorizerImpl.PNG!

 

Thanks,

Pradeep

> Java implementation for Authorizer
> --
>
> Key: KAFKA-7546
> URL: https://issues.apache.org/jira/browse/KAFKA-7546
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Pradeep Bansal
>Priority: Major
> Attachments: AuthorizerImpl.PNG
>
>
> I am using kafka with authentication and authorization. I wanted to plugin my 
> own implementation of Authorizer which doesn't use zookeeper instead has 
> permission mapping in SQL database. Is it possible to write Authorizer code 
> in Java?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7546) Java implementation for Authorizer

2018-10-25 Thread Pradeep Bansal (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pradeep Bansal updated KAFKA-7546:
--
Attachment: AuthorizerImpl.PNG

> Java implementation for Authorizer
> --
>
> Key: KAFKA-7546
> URL: https://issues.apache.org/jira/browse/KAFKA-7546
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Pradeep Bansal
>Priority: Major
> Attachments: AuthorizerImpl.PNG
>
>
> I am using kafka with authentication and authorization. I wanted to plugin my 
> own implementation of Authorizer which doesn't use zookeeper instead has 
> permission mapping in SQL database. Is it possible to write Authorizer code 
> in Java?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7402) Kafka Streams should implement AutoCloseable where appropriate

2018-10-25 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663439#comment-16663439
 ] 

ASF GitHub Bot commented on KAFKA-7402:
---

shunge opened a new pull request #5839: KAFKA-7402: Kafka Streams should 
implement AutoCloseable where approp…
URL: https://github.com/apache/kafka/pull/5839
 
 
   KAFKA-7402: Kafka Streams should implement AutoCloseable where appropriate.
   KIP 376: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-376%3A+Implement+AutoClosable+on+appropriate+classes+that+want+to+be+used+in+a+try-with-resource+statement
   
   ### Committer Checklist (excluded from commit message)
   - [X] Verify design and implementation 
   - [X] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kafka Streams should implement AutoCloseable where appropriate
> --
>
> Key: KAFKA-7402
> URL: https://issues.apache.org/jira/browse/KAFKA-7402
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Yishun Guan
>Priority: Minor
>  Labels: needs-kip, newbie
>
> Various components in Streams have close methods but do not implement 
> AutoCloseable. This means that they can't be used in try-with-resources 
> blocks.
> Remedying that would simplify our tests and make life easier for users as 
> well.
> KafkaStreams itself is a notable example of this, but we can take the 
> opportunity to look for other components that make sense as AutoCloseable as 
> well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7546) Java implementation for Authorizer

2018-10-25 Thread Mayuresh Gharat (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663321#comment-16663321
 ] 

Mayuresh Gharat commented on KAFKA-7546:


Hi [~bansalp],

 

Kafka Provides Authorizer as an interface. You should be able to implement the 
interface and write your own implementation. You will need to add your 
AuthorizerImpl class name in the config "authorizer.class.name". At Linkedin, 
we have our own implementation of Authorizer that does not use Zookeeper for 
authorization but rather talks to our inhouse ACL store. You should be able to 
do the same.

 

Hope this helps.

 

Thanks,

 

Mayuresh

> Java implementation for Authorizer
> --
>
> Key: KAFKA-7546
> URL: https://issues.apache.org/jira/browse/KAFKA-7546
> Project: Kafka
>  Issue Type: Improvement
>  Components: security
>Reporter: Pradeep Bansal
>Priority: Major
>
> I am using kafka with authentication and authorization. I wanted to plugin my 
> own implementation of Authorizer which doesn't use zookeeper instead has 
> permission mapping in SQL database. Is it possible to write Authorizer code 
> in Java?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-25 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-7548:
---
Description: 
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

 

 

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

 
 
*After fix (records consumed)*
|Number of Partitions
 Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
 (60.022204036 sec)|5846512
 (60.0168916 sec)|
|2|8257390
 (60.011121061 sec)|7776150
 (60.01620875 sec)|5269557
 (60.022581248 sec)|
|4|7938510
 (60.011829002 sec)|7510140
 (60.017571391 sec)|5213496
 (60.000230139 sec)|
|6|7100970
 (60.007220465 sec)|6382845
 (60.038580526 sec)|4519645
 (60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
 (60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
 (60.41961 sec)|4884693
 (60.42054 sec)|

  was:
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :
{noformat}
*no* further _formatting_ is done here{noformat}
*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

 
 
*After fix (records consumed)*
|Number of Partitions
 Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
 (60.022204036 sec)|5846512
 (60.0168916 sec)|
|2|8257390
 (60.011121061 sec)|7776150
 (60.01620875 sec)|5269557
 (60.022581248 sec)|
|4|7938510
 (60.011829002 sec)|7510140
 (60.017571391 sec)|5213496
 (60.000230139 sec)|
|6|7100970
 (60.007220465 sec)|6382845
 (60.038580526 sec)|4519645
 (60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
 (60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
 (60.41961 sec)|4884693
 (60.42054 sec)|


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> ---
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
> brokers that is 

[jira] [Updated] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-25 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-7548:
---
Description: 
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :
{noformat}
*no* further _formatting_ is done here{noformat}
*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

 
 
*After fix (records consumed)*
|Number of Partitions
 Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
 (60.022204036 sec)|5846512
 (60.0168916 sec)|
|2|8257390
 (60.011121061 sec)|7776150
 (60.01620875 sec)|5269557
 (60.022581248 sec)|
|4|7938510
 (60.011829002 sec)|7510140
 (60.017571391 sec)|5213496
 (60.000230139 sec)|
|6|7100970
 (60.007220465 sec)|6382845
 (60.038580526 sec)|4519645
 (60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
 (60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
 (60.41961 sec)|4884693
 (60.42054 sec)|

  was:
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

 
 
*After fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
 (60.022204036 sec)|5846512
 (60.0168916 sec)|
|2|8257390
 (60.011121061 sec)|7776150
 (60.01620875 sec)|5269557
 (60.022581248 sec)|
|4|7938510
 (60.011829002 sec)|7510140
 (60.017571391 sec)|5213496
 (60.000230139 sec)|
|6|7100970
 (60.007220465 sec)|6382845
 (60.038580526 sec)|4519645
 (60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
 (60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
 (60.41961 sec)|4884693
 (60.42054 sec)|


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> ---
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
> brokers that is 

[jira] [Updated] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-25 Thread Mayuresh Gharat (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-7548:
---
Description: 
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

 
 
*After fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
 (60.022204036 sec)|5846512
 (60.0168916 sec)|
|2|8257390
 (60.011121061 sec)|7776150
 (60.01620875 sec)|5269557
 (60.022581248 sec)|
|4|7938510
 (60.011829002 sec)|7510140
 (60.017571391 sec)|5213496
 (60.000230139 sec)|
|6|7100970
 (60.007220465 sec)|6382845
 (60.038580526 sec)|4519645
 (60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
 (60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
 (60.41961 sec)|4884693
 (60.42054 sec)|

  was:
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

*After fix (records consumed)*
|Number of Partitions
Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
(60.022204036 sec)|5846512
(60.0168916 sec)|
|2|8257390
(60.011121061 sec)|7776150
(60.01620875 sec)|5269557
(60.022581248 sec)|
|4|7938510
(60.011829002 sec)|7510140
(60.017571391 sec)|5213496
(60.000230139 sec)|
|6|7100970
(60.007220465 sec)|6382845
(60.038580526 sec)|4519645
(60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
(60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
(60.41961 sec)|4884693
(60.42054 sec)|


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> ---
>
> Key: KAFKA-7548
> URL: https://issues.apache.org/jira/browse/KAFKA-7548
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Mayuresh Gharat
>Assignee: Mayuresh Gharat
>Priority: Major
>
> In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
> brokers that is buffered in completedFetch queue. Now if we pause a few 
> partitions, it seems that 

[jira] [Created] (KAFKA-7548) KafkaConsumer should not throw away already fetched data for paused partitions.

2018-10-25 Thread Mayuresh Gharat (JIRA)
Mayuresh Gharat created KAFKA-7548:
--

 Summary: KafkaConsumer should not throw away already fetched data 
for paused partitions.
 Key: KAFKA-7548
 URL: https://issues.apache.org/jira/browse/KAFKA-7548
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Mayuresh Gharat
Assignee: Mayuresh Gharat


In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. Here are the results :

*Before fix (records consumed)*
|maxPollRecords->
Number of Partitions
Paused
\|
V|10|5|1|
|0|8605320
(60.022276059 sec)|8337690
(60.026690095 sec)|6424753
(60.67003 sec)|
|2|101910
(60.006989628 sec)|49350
(60.022598668 sec)|10495
(60.020077555 sec)|
|4|48420
(60.022096537 sec)|24850
(60.007451162 sec)|5004
(60.009773507 sec) |
|6|30420
(60.018380086 sec)|15385
(60.011912135 sec)|3152
(60.013573487 sec)|
|8|23390
(60.043122495 sec)|11390
(60.013297496 sec)|2237
(60.038921333 sec)|
|9|20230 (60.026183204 sec)|10355
(60.015584914 sec)|2087
(60.00319069 sec)|

 

*After fix (records consumed)*
|Number of Partitions
Paused / maxPollRecords|10|5|1|
|0|8662740 (60.011527576 sec)|8203445
(60.022204036 sec)|5846512
(60.0168916 sec)|
|2|8257390
(60.011121061 sec)|7776150
(60.01620875 sec)|5269557
(60.022581248 sec)|
|4|7938510
(60.011829002 sec)|7510140
(60.017571391 sec)|5213496
(60.000230139 sec)|
|6|7100970
(60.007220465 sec)|6382845
(60.038580526 sec)|4519645
(60.48034 sec)|
|8|6799956 (60.001850171 sec)|6482421
(60.001997219 sec)|4383300 (60.4836 sec)|
|9|7045177 (60.035366096 sec)|6465839 
(60.41961 sec)|4884693
(60.42054 sec)|



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7547) Avoid relogin in kafka if connection is already established.

2018-10-25 Thread Pradeep Bansal (JIRA)
Pradeep Bansal created KAFKA-7547:
-

 Summary: Avoid relogin in kafka if connection is already 
established.
 Key: KAFKA-7547
 URL: https://issues.apache.org/jira/browse/KAFKA-7547
 Project: Kafka
  Issue Type: Improvement
  Components: security
Reporter: Pradeep Bansal


I am new to kafka and may be there are ways already there for my requirement. I 
didn't find a way so far and hence I though I will post it here.

Currently, I observed that kafka periodically tries to renew kerberos token 
using kinit -R command. I found that I can set 
sasl.kerberos.min.time.before.relogin and change default from 1 minute to 1 day 
max. But in my case I am not clear on why renew is even required.

 

If it is not really required is there a way to turn it off?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7546) Java implementation for Authorizer

2018-10-25 Thread Pradeep Bansal (JIRA)
Pradeep Bansal created KAFKA-7546:
-

 Summary: Java implementation for Authorizer
 Key: KAFKA-7546
 URL: https://issues.apache.org/jira/browse/KAFKA-7546
 Project: Kafka
  Issue Type: Improvement
  Components: security
Reporter: Pradeep Bansal


I am using kafka with authentication and authorization. I wanted to plugin my 
own implementation of Authorizer which doesn't use zookeeper instead has 
permission mapping in SQL database. Is it possible to write Authorizer code in 
Java?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)