[jira] [Commented] (KAFKA-6870) Concurrency conflicts in SampledStat

2018-05-08 Thread Chia-Ping Tsai (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468303#comment-16468303
 ] 

Chia-Ping Tsai commented on KAFKA-6870:
---

After reading the Sensor and KafkaMetrics, I feel we should make 
KafkaMetric#measurableValue thread-safe to resolve this issue. We have passed a 
lock to KafkaMetrics and KafkaMetrics should use the lock when trying to get 
the measure value from metricValueProvider. [~ijuma]  [~rsivaram] Any 
suggestions? 

> Concurrency conflicts in SampledStat
> 
>
> Key: KAFKA-6870
> URL: https://issues.apache.org/jira/browse/KAFKA-6870
> Project: Kafka
>  Issue Type: Bug
>Reporter: Chia-Ping Tsai
>Priority: Major
> Fix For: 2.0.0, 1.1.1
>
>
> The samples stored in SampledStat is not thread-safe. However, 
> ReplicaFetcherThreads used to handle replica to specified brokers may update 
> (when the samples is empty, we will add a new sample to it) and iterate the 
> samples concurrently, and then cause the ConcurrentModificationException.
> {code:java}
> [2018-05-03 13:50:56,087] ERROR [ReplicaFetcher replicaId=106, leaderId=100, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread:76)
> java.util.ConcurrentModificationException
> at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:909)
> at java.util.ArrayList$Itr.next(ArrayList.java:859)
> at 
> org.apache.kafka.common.metrics.stats.Rate$SampledTotal.combine(Rate.java:132)
> at 
> org.apache.kafka.common.metrics.stats.SampledStat.measure(SampledStat.java:78)
> at org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:66)
> at 
> org.apache.kafka.common.metrics.KafkaMetric.measurableValue(KafkaMetric.java:85)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:201)
> at org.apache.kafka.common.metrics.Sensor.checkQuotas(Sensor.java:192)
> at 
> kafka.server.ReplicationQuotaManager.isQuotaExceeded(ReplicationQuotaManager.scala:104)
> at 
> kafka.server.ReplicaFetcherThread.kafka$server$ReplicaFetcherThread$$shouldFollowerThrottle(ReplicaFetcherThread.scala:384)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:263)
> at 
> kafka.server.ReplicaFetcherThread$$anonfun$buildFetchRequest$1.apply(ReplicaFetcherThread.scala:261)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaFetcherThread.buildFetchRequest(ReplicaFetcherThread.scala:261)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:102)
> at 
> kafka.server.AbstractFetcherThread$$anonfun$2.apply(AbstractFetcherThread.scala:101)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:250)
> at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:101)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {code}
> Before 
> [https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35|https://github.com/apache/kafka/commit/d734f4e56d276f84b8c52b602edd67d41cbb6c35]
>  the ConcurrentModificationException doesn't exist since all changes to 
> samples is "add" currently. Using the get(index) is able to avoid the 
> ConcurrentModificationException.
> In short, we can just make samples thread-safe. Or just replace the foreach 
> loop by get(index) if we have concerns about the performance of thread-safe 
> list...
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6761) Reduce Kafka Streams Footprint

2018-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6761?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468244#comment-16468244
 ] 

ASF GitHub Bot commented on KAFKA-6761:
---

bbejeck opened a new pull request #4983: KAFKA-6761 [WIP]- reduce streams 
footprint part II
URL: https://github.com/apache/kafka/pull/4983
 
 
   This version is a WIP and intentionally leaves out some additional required 
changes to keep the reviewing effort more manageable. This version of the 
process includes
   
   1. Cleaning up the graph objects to reduce the number of parameters and make 
the naming conventions more clear.
   2. Intercepting all calls to the `InternalToplogyBuilder` and capturing all 
details required for possible optimizations and building the final topology.
   
   
   This PR does not include writing out the current physical plan, so no tests 
included.  The next PR will include additional changes to building the graph 
and writing the topology out without optimizations, using the current streams 
tests.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce Kafka Streams Footprint
> --
>
> Key: KAFKA-6761
> URL: https://issues.apache.org/jira/browse/KAFKA-6761
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Bill Bejeck
>Priority: Major
> Fix For: 2.0.0
>
>
> The persistent storage footprint of a Kafka Streams application contains the 
> following aspects:
>  # The internal topics created on the Kafka cluster side.
>  # The materialized state stores on the Kafka Streams application instances 
> side.
> There have been some questions about reducing these footprints, especially 
> since many of them are not necessary. For example, there are redundant 
> internal topics, as well as unnecessary state stores that takes up space but 
> also affect performance. When people are pushing Streams to production with 
> high traffic, this issue would be more common and severe. Reducing the 
> footprint of Streams have clear benefits for reducing resource utilization of 
> Kafka Streams applications, and also not creating pressure on broker's 
> capacities.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6786) Remove additional configs in StreamsBrokerDownResilienceTest

2018-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468243#comment-16468243
 ] 

ASF GitHub Bot commented on KAFKA-6786:
---

abhishek7 opened a new pull request #4982: KAFKA-6786
URL: https://github.com/apache/kafka/pull/4982
 
 
   First iteration of KAFKA-6786. Currently a WIP as it is my first time 
contributing; want to make sure that we implemented this correctly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Remove additional configs in StreamsBrokerDownResilienceTest
> 
>
> Key: KAFKA-6786
> URL: https://issues.apache.org/jira/browse/KAFKA-6786
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Abhishek Sharma
>Priority: Minor
>  Labels: newbie
>
> Since we are now passing in a property file into the streams service 
> initialization code, we do not need to pass in those configs as additional 
> properties in StreamsBrokerDownResilienceTest. We can refactor this test to 
> get rid of the additional properties.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6877) Remove completedFetch upon a failed parse if it contains no records.

2018-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468221#comment-16468221
 ] 

ASF GitHub Bot commented on KAFKA-6877:
---

becketqin closed pull request #4974: KAFKA-6877; Remove completedFetch upon a 
failed parse if it contains no records.
URL: https://github.com/apache/kafka/pull/4974
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 6d8fb6c2466..b3791ffb514 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -472,6 +472,7 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map timestamp
  * @return The fetched records per partition
  * @throws OffsetOutOfRangeException If there is OffsetOutOfRange error in 
fetchResponse and
  * the defaultResetPolicy is NONE
+ * @throws TopicAuthorizationException If there is TopicAuthorization 
error in fetchResponse.
  */
 public Map>> fetchedRecords() {
 Map>> fetched = new 
HashMap<>();
@@ -483,7 +484,20 @@ private ListOffsetResult 
fetchOffsetsByTimes(Map timestamp
 CompletedFetch completedFetch = completedFetches.peek();
 if (completedFetch == null) break;
 
-nextInLineRecords = parseCompletedFetch(completedFetch);
+try {
+nextInLineRecords = 
parseCompletedFetch(completedFetch);
+} catch (Exception e) {
+// Remove a completedFetch upon a parse with exception 
if (1) it contains no records, and
+// (2) there are no fetched records with actual 
content preceding this exception.
+// The first condition ensures that the 
completedFetches is not stuck with the same completedFetch
+// in cases such as the TopicAuthorizationException, 
and the second condition ensures that no
+// potential data loss due to an exception in a 
following record.
+FetchResponse.PartitionData partition = 
completedFetch.partitionData;
+if (fetched.isEmpty() && (partition.records == null || 
partition.records.sizeInBytes() == 0)) {
+completedFetches.poll();
+}
+throw e;
+}
 completedFetches.poll();
 } else {
 List> records = 
fetchRecords(nextInLineRecords, recordsRemaining);
@@ -945,7 +959,7 @@ private PartitionRecords parseCompletedFetch(CompletedFetch 
completedFetch) {
 this.metadata.requestUpdate();
 } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
 if (fetchOffset != subscriptions.position(tp)) {
-log.debug("Discarding stale fetch response for partition 
{} since the fetched offset {}" +
+log.debug("Discarding stale fetch response for partition 
{} since the fetched offset {} " +
 "does not match the current offset {}", tp, 
fetchOffset, subscriptions.position(tp));
 } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
 log.info("Fetch offset {} is out of range for partition 
{}, resetting offset", fetchOffset, tp);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 6de11866ec4..7157470b760 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -59,6 +59,7 @@
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.SimpleRecord;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.AbstractRequest;
@@ -117,6 +118,8 @@
 private final String metricGroup = "consumer" + groupId + 
"-fetch-manager-metrics";
 private TopicPartition tp0 = new TopicPartition(topicName, 0);
 

[jira] [Resolved] (KAFKA-6292) KafkaConsumer ran into Unknown error fetching data for topic-partition caused by integer overflow in FileLogInputStream

2018-05-08 Thread Jason Gustafson (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6292?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6292.

Resolution: Fixed

> KafkaConsumer ran into Unknown error fetching data for topic-partition caused 
> by integer overflow in FileLogInputStream 
> 
>
> Key: KAFKA-6292
> URL: https://issues.apache.org/jira/browse/KAFKA-6292
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.11.0.0, 0.11.0.1, 1.0.0, 0.11.0.2
> Environment: OS:Red Hat Enterprise Linux Server release 7.3 (Maipo)
> Kafka: kafka_2.12-0.11.0.0
> JDK: jdk1.8.0_121
>Reporter: Terence Yi
>Priority: Major
>  Labels: easyfix, reliability
> Fix For: 2.0.0
>
>
> Steps to reproduce:
> * Broker config to reproduce this bug:
> {code:java}
>   # The maximum size of a log segment file. When this size is reached a new 
> log segment will be created.
> #2G
> log.segment.bytes=2147483647
> {code}
> * Setups:
> producer sends messages constantly. 
> consumer polling
> topic has 1 partitions and replication factor 1.
> min.insync.replicas=1
> producer has "acks=all"
> consumer has default "enable.auto.commit=false"
> consumer manually commitSync offsets after handling messages.
> kafka in standalone
> * Observe log in consumer side(for me running 12 hours)
> {code:java}
> 2017-12-18 07:11:01.013 WARN sep105v1 
> [app-consumer-subscription-pool-4-thread-20] 
> org.apache.kafka.clients.consumer.internals.Fetcher {} Unknown error fetching 
> data for topic-partition DDI.DISPATCHER.P_TVIN.W_SL.P_appx.P_ul.P_pos-0
> {code}
> * Observe server.log in Kafka/logs
> {code:java}
> [2017-12-14 04:52:21,144] ERROR [Replica Manager on Broker 3]: Error 
> processing fetch operation on partition 
> DDI.DISPATCHER.P_TVIN.W_SL.P_appx.P_ul.P_pos-0, offset 4043314339 
> (kafka.server.ReplicaManager)
> org.apache.kafka.common.KafkaException: java.io.EOFException: Failed to read 
> `log header` from file channel `sun.nio.ch.FileChannelImpl@5604ea91`. 
> Expected to read 17 bytes, but reached end of file after reading 0 bytes. 
> Started read from position 2147483643.
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:40)
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:24)
> at 
> org.apache.kafka.common.utils.AbstractIterator.maybeComputeNext(AbstractIterator.java:79)
> at 
> org.apache.kafka.common.utils.AbstractIterator.hasNext(AbstractIterator.java:45)
> at 
> org.apache.kafka.common.record.FileRecords.searchForOffsetWithSize(FileRecords.java:279)
> at kafka.log.LogSegment.translateOffset(LogSegment.scala:176)
> at kafka.log.LogSegment.read(LogSegment.scala:228)
> at kafka.log.Log.read(Log.scala:938)
> at kafka.server.ReplicaManager.read$1(ReplicaManager.scala:719)
> at 
> kafka.server.ReplicaManager.$anonfun$readFromLocalLog$6(ReplicaManager.scala:780)
> at 
> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:59)
> at 
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:52)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at 
> kafka.server.ReplicaManager.readFromLocalLog(ReplicaManager.scala:779)
> at kafka.server.ReplicaManager.fetchMessages(ReplicaManager.scala:617)
> at kafka.server.KafkaApis.handleFetchRequest(KafkaApis.scala:615)
> at kafka.server.KafkaApis.handle(KafkaApis.scala:98)
> at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:66)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.EOFException: Failed to read `log header` from file 
> channel `sun.nio.ch.FileChannelImpl@5604ea91`. Expected to read 17 bytes, but 
> reached end of file after reading 0 bytes. Started read from position 
> 2147483643.
> at org.apache.kafka.common.utils.Utils.readFullyOrFail(Utils.java:751)
> at 
> org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:66)
> at 
> org.apache.kafka.common.record.FileLogInputStream.nextBatch(FileLogInputStream.java:40)
> at 
> org.apache.kafka.common.record.RecordBatchIterator.makeNext(RecordBatchIterator.java:35)
> ... 18 more
> {code}
> * Impact:
> # After EOF exception occurs, the consumer will failed to consume the remain 
> message
> # After the segments log files which cause the EOF exception has been deleted 
> by the log Cleaner thread. Consumer recovered to consumer message.
> # Have no impact from the view of producer
> * Analysis:
> # Kafka log file list:
> 

[jira] [Commented] (KAFKA-6292) KafkaConsumer ran into Unknown error fetching data for topic-partition caused by integer overflow in FileLogInputStream

2018-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6292?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468133#comment-16468133
 ] 

ASF GitHub Bot commented on KAFKA-6292:
---

hachikuji closed pull request #4928: KAFKA-6292: Improved FileLogInputStream 
batch position checks in order to avoid type overflow related errors
URL: https://github.com/apache/kafka/pull/4928
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java 
b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index a1e3a2f6541..92e8864a183 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -60,7 +60,7 @@
 @Override
 public FileChannelRecordBatch nextBatch() throws IOException {
 FileChannel channel = fileRecords.channel();
-if (position + HEADER_SIZE_UP_TO_MAGIC >= end)
+if (position >= end - HEADER_SIZE_UP_TO_MAGIC)
 return null;
 
 logHeaderBuffer.rewind();
@@ -75,7 +75,7 @@ public FileChannelRecordBatch nextBatch() throws IOException {
 throw new CorruptRecordException(String.format("Found record size 
%d smaller than minimum record " +
 "overhead (%d) in file %s.", size, 
LegacyRecord.RECORD_OVERHEAD_V0, fileRecords.file()));
 
-if (position + LOG_OVERHEAD + size > end)
+if (position > end - LOG_OVERHEAD - size)
 return null;
 
 byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
index 95b2a0c89c6..77aaae86f5f 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -112,8 +112,8 @@ public void testBatchIterationWithMultipleRecordsPerBatch() 
throws IOException {
 SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
 new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
 new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
-
 };
+
 SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
 new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
 new SimpleRecord(897839L, null, "4".getBytes()),
@@ -152,8 +152,8 @@ public void testBatchIterationV2() throws IOException {
 SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
 new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
 new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
-
 };
+
 SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
 new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
 new SimpleRecord(897839L, null, "4".getBytes()),
@@ -204,6 +204,22 @@ public void testBatchIterationIncompleteBatch() throws 
IOException {
 }
 }
 
+@Test
+public void testNextBatchSelectionWithMaxedParams() throws IOException {
+try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+FileLogInputStream logInputStream = new 
FileLogInputStream(fileRecords, Integer.MAX_VALUE, Integer.MAX_VALUE);
+assertNull(logInputStream.nextBatch());
+}
+}
+
+@Test
+public void testNextBatchSelectionWithZeroedParams() throws IOException {
+try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+FileLogInputStream logInputStream = new 
FileLogInputStream(fileRecords, 0, 0);
+assertNull(logInputStream.nextBatch());
+}
+}
+
 private void assertProducerData(RecordBatch batch, long producerId, short 
producerEpoch, int baseSequence,
 boolean isTransactional, SimpleRecord ... 
records) {
 assertEquals(producerId, batch.producerId());


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer ran into Unknown error fetching data for topic-partition caused 
> by integer overflow in FileLogInputStream 
> 

[jira] [Commented] (KAFKA-6879) Controller deadlock following session expiration

2018-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6879?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16468075#comment-16468075
 ] 

ASF GitHub Bot commented on KAFKA-6879:
---

ijuma closed pull request #4977: KAFKA-6879; Invoke session init callbacks 
outside lock to avoid deadlock
URL: https://github.com/apache/kafka/pull/4977
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index bc721e39f96..d3d1a81c41f 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -160,7 +160,10 @@ class KafkaController(val config: KafkaConfig, zkClient: 
KafkaZkClient, time: Ti
   override def beforeInitializingSession(): Unit = {
 val expireEvent = new Expire
 eventManager.clearAndPut(expireEvent)
-expireEvent.waitUntilProcessed()
+
+// Block initialization of the new session until the expiration event 
is being handled,
+// which ensures that all pending events have been processed before 
creating the new session
+expireEvent.waitUntilProcessingStarted()
   }
 })
 eventManager.put(Startup)
@@ -1518,17 +1521,17 @@ class KafkaController(val config: KafkaConfig, 
zkClient: KafkaZkClient, time: Ti
 
   // We can't make this a case object due to the countDownLatch field
   class Expire extends ControllerEvent {
-private val countDownLatch = new CountDownLatch(1)
+private val processingStarted = new CountDownLatch(1)
 override def state = ControllerState.ControllerChange
 
 override def process(): Unit = {
-  countDownLatch.countDown()
+  processingStarted.countDown()
   activeControllerId = -1
   onControllerResignation()
 }
 
-def waitUntilProcessed(): Unit = {
-  countDownLatch.await()
+def waitUntilProcessingStarted(): Unit = {
+  processingStarted.await()
 }
   }
 
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala 
b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 5407934e987..24eb17770ec 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -99,11 +99,15 @@ class KafkaScheduler(val threads: Int,
 }
   }
 
-  def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: 
TimeUnit) {
+  def scheduleOnce(name: String, fun: () => Unit): Unit = {
+schedule(name, fun, delay = 0L, period = -1L, unit = TimeUnit.MILLISECONDS)
+  }
+
+  def schedule(name: String, fun: () => Unit, delay: Long, period: Long, unit: 
TimeUnit) {
 debug("Scheduling task %s with initial delay %d ms and period %d ms."
 .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), 
TimeUnit.MILLISECONDS.convert(period, unit)))
 this synchronized {
-  ensureRunning
+  ensureRunning()
   val runnable = CoreUtils.runnable {
 try {
   trace("Beginning execution of scheduled task '%s'.".format(name))
diff --git a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala 
b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
index 5c4cd685565..5cb127c3c7e 100755
--- a/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
+++ b/core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
@@ -59,7 +59,7 @@ class ZooKeeperClient(connectString: String,
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChildChangeHandler]().asScala
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
   private val stateChangeHandlers = new ConcurrentHashMap[String, 
StateChangeHandler]().asScala
-  private[zookeeper] val expiryScheduler = new KafkaScheduler(0, 
"zk-session-expiry-handler")
+  private[zookeeper] val expiryScheduler = new KafkaScheduler(threads = 1, 
"zk-session-expiry-handler")
 
   private val metricNames = Set[String]()
 
@@ -325,43 +325,65 @@ class ZooKeeperClient(connectString: String,
 zooKeeper
   }
   
-  private def initialize(): Unit = {
-if (!connectionState.isAlive) {
-  zooKeeper.close()
-  info(s"Initializing a new session to $connectString.")
-  // retry forever until ZooKeeper can be instantiated
-  var connected = false
-  while (!connected) {
-try {
-  zooKeeper = new ZooKeeper(connectString, sessionTimeoutMs, 
ZooKeeperClientWatcher)
-  connected = true
-} catch {
-  case e: Exception =>
-info("Error when recreating ZooKeeper, retrying after a short 
sleep", e)
-Thread.sleep(1000)
+  private def reinitialize(): Unit = {
+// 

[jira] [Created] (KAFKA-6886) Externalize Secrets for Kafka Connect Configurations

2018-05-08 Thread Robert Yokota (JIRA)
Robert Yokota created KAFKA-6886:


 Summary: Externalize Secrets for Kafka Connect Configurations
 Key: KAFKA-6886
 URL: https://issues.apache.org/jira/browse/KAFKA-6886
 Project: Kafka
  Issue Type: New Feature
  Components: KafkaConnect
Reporter: Robert Yokota
Assignee: Robert Yokota
 Fix For: 2.0.0


Kafka Connect's connector configurations have plaintext passwords, and Connect 
stores these in cleartext either on the filesystem (for standalone mode) or in 
internal topics (for distributed mode). 

Connect should not store or transmit cleartext passwords in connector 
configurations. Secrets in stored connector configurations should be allowed to 
be replaced with references to values stored in external secret management 
systems. Connect should provide an extension point for adding customized 
integrations, as well as provide a file-based extension as an example. Second, 
a Connect runtime should be allowed to be configured to use one or more of 
these extensions, and allow connector configurations to use placeholders that 
will be resolved by the runtime before passing the complete connector 
configurations to connectors. This will allow existing connectors to not see 
any difference in the configurations that Connect provides to them at startup. 
And third, Connect's API should be changed to allow a connector to obtain the 
latest connector configuration at any time.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6885) DescribeConfigs synonyms are are identical to parent entry for BROKER resources

2018-05-08 Thread Manikumar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467896#comment-16467896
 ] 

Manikumar commented on KAFKA-6885:
--

In case of topics,  there is a difference between names of topic override 
configs and server default configs (ex:  "max.message.bytes" is topic override 
config  name for  "message.max.bytes" server property). In this case synonym 
name will be different.

But in case of broker there is no name difference between Dynamic broker 
configs and server default configs (Both places we use "message.max.bytes"  
config name. We can differentiate by using synonyms source property 
(DEFAULT_CONFIG, STATIC_BROKER_CONFIG etc..)



> DescribeConfigs synonyms are are identical to parent entry for BROKER 
> resources
> ---
>
> Key: KAFKA-6885
> URL: https://issues.apache.org/jira/browse/KAFKA-6885
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Affects Versions: 1.1.0
>Reporter: Magnus Edenhill
>Priority: Major
>
> The DescribeConfigs protocol response for BROKER resources returns synonyms 
> for various configuration entries, such as "log.dir".
> The list of synonyms returned are identical to their parent configuration 
> entry, rather than the actual synonyms.
> For example, for the broker "log.dir" config entry it returns one synonym, 
> also named "log.dir" rather than "log.dirs" or whatever the synonym is 
> supposed to be.
>  
> This does not seem to happen for TOPIC resources.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6264) Log cleaner thread may die on legacy segment containing messages whose offsets are too large

2018-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467795#comment-16467795
 ] 

ASF GitHub Bot commented on KAFKA-6264:
---

dhruvilshah3 closed pull request #4692: KAFKA-6264: Log cleaner thread may die 
on legacy segment containing messages whose offsets are too large
URL: https://github.com/apache/kafka/pull/4692
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala 
b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 44083c186c8..4377c6f32a6 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -49,7 +49,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, 
val baseOffset: Lon
   protected val lock = new ReentrantLock
 
   @volatile
-  protected var mmap: MappedByteBuffer = {
+  protected[log] var mmap: MappedByteBuffer = {
 val newlyCreated = file.createNewFile()
 val raf = if (writable) new RandomAccessFile(file, "rw") else new 
RandomAccessFile(file, "r")
 try {
@@ -88,7 +88,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, 
val baseOffset: Lon
 
   /** The number of entries in this index */
   @volatile
-  protected var _entries = mmap.position() / entrySize
+  protected[log] var _entries = mmap.position() / entrySize
 
   /**
* True iff there are no more slots available in this index
diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index cc693375079..39871872a63 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -351,7 +351,10 @@ class Log(@volatile var dir: File,
   time = time,
   fileAlreadyExists = true)
 
-try segment.sanityCheck(timeIndexFileNewlyCreated)
+try {
+  segment.sanityCheck(timeIndexFileNewlyCreated)
+  segment.initializeTimestampMetadata()
+}
 catch {
   case _: NoSuchFileException =>
 error(s"Could not find offset index file corresponding to log file 
${segment.log.file.getAbsolutePath}, " +
@@ -1120,15 +1123,15 @@ class Log(@volatile var dir: File,
   s"for partition $topicPartition is ${config.messageFormatVersion} 
which is earlier than the minimum " +
   s"required version $KAFKA_0_10_0_IV0")
 
-  // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
-  // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
-  val segmentsCopy = logSegments.toBuffer
   // For the earliest and latest, we do not need to return the timestamp.
   if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
 return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))
   else if (targetTimestamp == ListOffsetRequest.LATEST_TIMESTAMP)
 return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logEndOffset))
 
+  // Cache to avoid race conditions. `toBuffer` is faster than most 
alternatives and provides
+  // constant time access while being safe to use with concurrent 
collections unlike `toArray`.
+  val segmentsCopy = logSegments.toBuffer
   val targetSeg = {
 // Get all the segments whose largest timestamp is smaller than target 
timestamp
 val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < 
targetTimestamp)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 0ee994252a1..c552d82710f 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -502,7 +502,9 @@ private[log] class Cleaner(val id: Int,
 try {
   // clean segments into the new destination segment
   val iter = segments.iterator
+  val numSegments = segments.length
   var currentSegmentOpt: Option[LogSegment] = Some(iter.next())
+
   while (currentSegmentOpt.isDefined) {
 val currentSegment = currentSegmentOpt.get
 val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None
@@ -516,7 +518,7 @@ private[log] class Cleaner(val id: Int,
 info(s"Cleaning segment $startOffset in log ${log.name} (largest 
timestamp ${new Date(currentSegment.largestTimestamp)}) " +
   s"into ${cleaned.baseOffset}, ${if(retainDeletes) "retaining" else 
"discarding"} deletes.")
 cleanInto(log.topicPartition, currentSegment.log, cleaned, map, 
retainDeletes, log.config.maxMessageSize,
-  transactionMetadata, log.activeProducersWithLastSequence, stats)
+  transactionMetadata, 

[jira] [Created] (KAFKA-6885) DescribeConfigs synonyms are are identical to parent entry for BROKER resources

2018-05-08 Thread Magnus Edenhill (JIRA)
Magnus Edenhill created KAFKA-6885:
--

 Summary: DescribeConfigs synonyms are are identical to parent 
entry for BROKER resources
 Key: KAFKA-6885
 URL: https://issues.apache.org/jira/browse/KAFKA-6885
 Project: Kafka
  Issue Type: Bug
  Components: admin
Affects Versions: 1.1.0
Reporter: Magnus Edenhill


The DescribeConfigs protocol response for BROKER resources returns synonyms for 
various configuration entries, such as "log.dir".

The list of synonyms returned are identical to their parent configuration 
entry, rather than the actual synonyms.

For example, for the broker "log.dir" config entry it returns one synonym, also 
named "log.dir" rather than "log.dirs" or whatever the synonym is supposed to 
be.

 

This does not seem to happen for TOPIC resources.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6878) NPE when querying global state store not in READY state

2018-05-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467708#comment-16467708
 ] 

ASF GitHub Bot commented on KAFKA-6878:
---

tedyu opened a new pull request #4978: KAFKA-6878 NPE when querying global 
state store not in READY state
URL: https://github.com/apache/kafka/pull/4978
 
 
   Check whether cache is null before retrieving from cache.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> NPE when querying global state store not in READY state
> ---
>
> Key: KAFKA-6878
> URL: https://issues.apache.org/jira/browse/KAFKA-6878
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Salazar
>Priority: Major
>
> Info: using kafka 1.1.0 - confluent 4.1.0
> We're trying to query a global state store, but if we query too quickly after 
> the application is started we get a 
> NullPointerException(CachingKeyValueStore.java:166).
> We have verified the key is nonNull.
> If we wait long enough (with our current amount of data 5 minutes after 
> start) we get an answer but before then we get NPEs.
>  
> Looks like there isn't a check to ensure `initInternal` actually sets the 
> cache before you're allowed to use it in the get method
>  
> stacktrace:
> {code:java}
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
> at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55)
> at 
> io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6878) NPE when querying global state store not in READY state

2018-05-08 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467700#comment-16467700
 ] 

Matthias J. Sax commented on KAFKA-6878:


Feel free to assign the Jira to yourself and open a PR [~yuzhih...@gmail.com]

> NPE when querying global state store not in READY state
> ---
>
> Key: KAFKA-6878
> URL: https://issues.apache.org/jira/browse/KAFKA-6878
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Salazar
>Priority: Major
>
> Info: using kafka 1.1.0 - confluent 4.1.0
> We're trying to query a global state store, but if we query too quickly after 
> the application is started we get a 
> NullPointerException(CachingKeyValueStore.java:166).
> We have verified the key is nonNull.
> If we wait long enough (with our current amount of data 5 minutes after 
> start) we get an answer but before then we get NPEs.
>  
> Looks like there isn't a check to ensure `initInternal` actually sets the 
> cache before you're allowed to use it in the get method
>  
> stacktrace:
> {code:java}
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
> at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55)
> at 
> io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6878) NPE when querying global state store not in READY state

2018-05-08 Thread Ted Yu (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467689#comment-16467689
 ] 

Ted Yu commented on KAFKA-6878:
---

How about this:
{code}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueSto
index 45f606f..d1080f8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -163,6 +163,9 @@ class CachingKeyValueStore extends 
WrappedStateStore.AbstractStateStore im
 }

 private byte[] getInternal(final Bytes key) {
+if (cache == null) {
+return null;
+}
 final LRUCacheEntry entry = cache.get(cacheName, key);
 if (entry == null) {
 final byte[] rawValue = underlying.get(key);
{code}

> NPE when querying global state store not in READY state
> ---
>
> Key: KAFKA-6878
> URL: https://issues.apache.org/jira/browse/KAFKA-6878
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Salazar
>Priority: Major
>
> Info: using kafka 1.1.0 - confluent 4.1.0
> We're trying to query a global state store, but if we query too quickly after 
> the application is started we get a 
> NullPointerException(CachingKeyValueStore.java:166).
> We have verified the key is nonNull.
> If we wait long enough (with our current amount of data 5 minutes after 
> start) we get an answer but before then we get NPEs.
>  
> Looks like there isn't a check to ensure `initInternal` actually sets the 
> cache before you're allowed to use it in the get method
>  
> stacktrace:
> {code:java}
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
> at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55)
> at 
> io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Issue Comment Deleted] (KAFKA-6878) NPE when querying global state store not in READY state

2018-05-08 Thread Ted Yu (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated KAFKA-6878:
--
Comment: was deleted

(was: How about the following change ?
{code}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKe
index 45f606f..1ceb13b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -163,6 +163,9 @@ class CachingKeyValueStore extends 
WrappedStateStore.AbstractStateStore im
 }

 private byte[] getInternal(final Bytes key) {
+if (key == null) {
+return null;
+}
 final LRUCacheEntry entry = cache.get(cacheName, key);
 if (entry == null) {
 final byte[] rawValue = underlying.get(key);
{code}
Client can check against null and re-poll if null is encountered.)

> NPE when querying global state store not in READY state
> ---
>
> Key: KAFKA-6878
> URL: https://issues.apache.org/jira/browse/KAFKA-6878
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Salazar
>Priority: Major
>
> Info: using kafka 1.1.0 - confluent 4.1.0
> We're trying to query a global state store, but if we query too quickly after 
> the application is started we get a 
> NullPointerException(CachingKeyValueStore.java:166).
> We have verified the key is nonNull.
> If we wait long enough (with our current amount of data 5 minutes after 
> start) we get an answer but before then we get NPEs.
>  
> Looks like there isn't a check to ensure `initInternal` actually sets the 
> cache before you're allowed to use it in the get method
>  
> stacktrace:
> {code:java}
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
> at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55)
> at 
> io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6878) NPE when querying global state store not in READY state

2018-05-08 Thread Salazar (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467676#comment-16467676
 ] 

Salazar commented on KAFKA-6878:


Hi Ted,

We did verify that the key is not null and the top layer class 
CompositeReadOnlyKeyValueStore already has an Objects.nonNull assertion in it 
that appropriately throws an NPE (as documented).  In this scenario the cache 
is actually null b/c the initInternal has not completed/been called before the 
getInternal was called.

> NPE when querying global state store not in READY state
> ---
>
> Key: KAFKA-6878
> URL: https://issues.apache.org/jira/browse/KAFKA-6878
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Salazar
>Priority: Major
>
> Info: using kafka 1.1.0 - confluent 4.1.0
> We're trying to query a global state store, but if we query too quickly after 
> the application is started we get a 
> NullPointerException(CachingKeyValueStore.java:166).
> We have verified the key is nonNull.
> If we wait long enough (with our current amount of data 5 minutes after 
> start) we get an answer but before then we get NPEs.
>  
> Looks like there isn't a check to ensure `initInternal` actually sets the 
> cache before you're allowed to use it in the get method
>  
> stacktrace:
> {code:java}
> java.lang.NullPointerException: null
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:166)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:159)
> at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:38)
> at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.get(InnerMeteredKeyValueStore.java:183)
> at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.get(MeteredKeyValueBytesStore.java:112)
> at 
> org.apache.kafka.streams.state.internals.CompositeReadOnlyKeyValueStore.get(CompositeReadOnlyKeyValueStore.java:55)
> at 
> io.repository.VisitRepositoryKafka.findByVisitTrackingId(VisitRepositoryKafka.java:76)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6884) ConsumerGroupCommand should use new AdminClient

2018-05-08 Thread Jason Gustafson (JIRA)
Jason Gustafson created KAFKA-6884:
--

 Summary: ConsumerGroupCommand should use new AdminClient
 Key: KAFKA-6884
 URL: https://issues.apache.org/jira/browse/KAFKA-6884
 Project: Kafka
  Issue Type: Improvement
Reporter: Jason Gustafson


Now that we have KIP-222, we should update consumer-groups.sh to use the new 
AdminClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6883) KafkaShortnamer should allow to convert Kerberos principal name to upper case user name

2018-05-08 Thread Attila Sasvari (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467207#comment-16467207
 ] 

Attila Sasvari commented on KAFKA-6883:
---

HADOOP-13984 is a similar issue.

> KafkaShortnamer should allow to convert Kerberos principal name to upper case 
> user name
> ---
>
> Key: KAFKA-6883
> URL: https://issues.apache.org/jira/browse/KAFKA-6883
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Attila Sasvari
>Priority: Major
>
> KAFKA-5764 implemented support to convert Kerberos principal name to lower 
> case Linux user name via auth_to_local rules. 
> As a follow-up, KafkaShortnamer could be further extended to allow converting 
> principal names to uppercase by appending /U to the rule.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6883) KafkaShortnamer should allow to convert Kerberos principal name to upper case user name

2018-05-08 Thread Attila Sasvari (JIRA)
Attila Sasvari created KAFKA-6883:
-

 Summary: KafkaShortnamer should allow to convert Kerberos 
principal name to upper case user name
 Key: KAFKA-6883
 URL: https://issues.apache.org/jira/browse/KAFKA-6883
 Project: Kafka
  Issue Type: Improvement
Reporter: Attila Sasvari


KAFKA-5764 implemented support to convert Kerberos principal name to lower case 
Linux user name via auth_to_local rules. 

As a follow-up, KafkaShortnamer could be further extended to allow converting 
principal names to uppercase by appending /U to the rule.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1614) Partition log directory name and segments information exposed via JMX

2018-05-08 Thread Matthias Meerhof (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467164#comment-16467164
 ] 

Matthias Meerhof commented on KAFKA-1614:
-

I second [~cmpsoares] request. Is it possible to include this patch and make 
this information available out of the box?

> Partition log directory name and segments information exposed via JMX
> -
>
> Key: KAFKA-1614
> URL: https://issues.apache.org/jira/browse/KAFKA-1614
> Project: Kafka
>  Issue Type: Improvement
>  Components: log
>Affects Versions: 0.8.1.1
>Reporter: Alexander Demidko
>Assignee: Alexander Demidko
>Priority: Major
> Attachments: log_segments_dir_jmx_info.patch
>
>
> Makes partition log directory and single segments information exposed via 
> JMX. This is useful to:
> - monitor disks usage in a cluster and on single broker
> - calculate disk space taken by different topics
> - estimate space to be freed when segments are expired
> Patch attached.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()

2018-05-08 Thread Anurag Jain (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467121#comment-16467121
 ] 

Anurag Jain commented on KAFKA-2200:


HI [~becket_qin],

Please assign me this Jira story, I shall start working and updating you about 
it.

 

Thanks,

> kafkaProducer.send() should not call callback.onCompletion()
> 
>
> Key: KAFKA-2200
> URL: https://issues.apache.org/jira/browse/KAFKA-2200
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Priority: Major
>  Labels: newbie
> Fix For: 2.0.0
>
>
> KafkaProducer.send() should not call callback.onCompletion() because this 
> might break the callback firing order.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()

2018-05-08 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467022#comment-16467022
 ] 

Jiangjie Qin commented on KAFKA-2200:
-

[~anurag_iit2k] Sure, please feel free to take this ticket.

> kafkaProducer.send() should not call callback.onCompletion()
> 
>
> Key: KAFKA-2200
> URL: https://issues.apache.org/jira/browse/KAFKA-2200
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Priority: Major
>  Labels: newbie
> Fix For: 2.0.0
>
>
> KafkaProducer.send() should not call callback.onCompletion() because this 
> might break the callback firing order.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2200) kafkaProducer.send() should not call callback.onCompletion()

2018-05-08 Thread Jiangjie Qin (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16467020#comment-16467020
 ] 

Jiangjie Qin commented on KAFKA-2200:
-

[~PnPie] Sorry about the belated response. Yes, you are right, I don't think 
the TimeoutException on waiting for metadata is an ApiException. That is a pure 
client side behavior. RecordTooLargeException is a little trickier, 
conceptually it is an ApiException, but the producer may throw it without 
talking to the server. I wonder if it is simpler to just throw an 
IllegalArgumentException when the records are too large in producer.send() if 
we want to make sure all the ApiExceptions are handled in the callback.

> kafkaProducer.send() should not call callback.onCompletion()
> 
>
> Key: KAFKA-2200
> URL: https://issues.apache.org/jira/browse/KAFKA-2200
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.10.1.0
>Reporter: Jiangjie Qin
>Priority: Major
>  Labels: newbie
> Fix For: 2.0.0
>
>
> KafkaProducer.send() should not call callback.onCompletion() because this 
> might break the callback firing order.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6882) Wrong producer settings may lead to DoS on Kafka Server

2018-05-08 Thread Seweryn Habdank-Wojewodzki (JIRA)
Seweryn Habdank-Wojewodzki created KAFKA-6882:
-

 Summary: Wrong producer settings may lead to DoS on Kafka Server
 Key: KAFKA-6882
 URL: https://issues.apache.org/jira/browse/KAFKA-6882
 Project: Kafka
  Issue Type: Bug
  Components: core, producer 
Affects Versions: 1.0.1, 1.1.0
Reporter: Seweryn Habdank-Wojewodzki


The documentation of the following parameters “linger.ms” and “batch.size” is a 
bit confusing. In fact those parameters wrongly set on the producer side might 
completely destroy BROKER throughput.

I see, that smart developers they are reading documentation of those parameters.
Then they want to have super performance and super safety, so they set 
something like this below:

{code}
kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);
kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
{code}

Then we have situation, when each and every message is send separately. TCP/IP 
protocol is really busy in that case and when they needed high throughput they 
got much less throughput, as every message is goes separately causing all 
network communication and TCP/IP overhead significant.

Those settings are good only if someone sends critical messages like once a 
while (e.g. one message per minute) and not when throughput is important by 
sending thousands messages per second.

Situation is even worse when smart developers are reading that for safety they 
need acknowledges from all cluster nodes. So they are adding:

{code}
kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");
{code}

And this is the end of Kafka performance! 

Even worse it is not a problem for the Kafka producer. The problem remains at 
the server (cluster, broker) side. The server is so busy by acknowledging *each 
and every* message from all nodes, that other work is NOT performed, so the end 
to end performance is almost none.

I would like to ask you to improve documentation of this parameters.
And consider corner cases is case of providing detailed information how extreme 
values of parameters - namely lowest and highest – may influence work of the 
cluster.
This was documentation issue. 

On the other hand it is security/safetly matter.

Technically the problem is that __commit_offsets topic is loaded with enormous 
amount of messages. It leads to the situation, when Kafka Broker is exposed to 
*DoS *due to the Producer settings. Three lines of code a bit load and the 
Kafka cluster is dead.
I suppose there are ways to prevent such a situation on the cluster side, but 
it require some loginc to be implemented to detect such a simple but efficient 
DoS.

BTW. Do Kafka Admin Tools provide any kind of "kill" connection, when one or 
the other producer makes problems?




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6881) Kafka 1.1 Broker version crashes when deleting log

2018-05-08 Thread K B Parthasarathy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466939#comment-16466939
 ] 

K B Parthasarathy edited comment on KAFKA-6881 at 5/8/18 6:24 AM:
--

The server was never restarted after Kafka was installed. Still we got this 
exception and Kafka broker crashed.


was (Author: kbpartha):
The server was never restarted after Kafka was installed. Still we got thsi 
exception and Kafka broker crashed.

> Kafka 1.1 Broker version crashes when deleting log
> --
>
> Key: KAFKA-6881
> URL: https://issues.apache.org/jira/browse/KAFKA-6881
> Project: Kafka
>  Issue Type: Bug
> Environment: Linux
>Reporter: K B Parthasarathy
>Priority: Critical
>
> Hello
> We are running Kafka 1.1 version in Linux from past 3 weeks. Today Kafka 
> crashed. When we checked server.log file the following log was found
> [2018-05-07 16:53:06,721] ERROR Failed to clean up log for 
> __consumer_offsets-24 in dir /tmp/kafka-logs due to IOException 
> (kafka.server.LogDirFailureChannel)
>  java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/__consumer_offsets-24/.log
>  at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>  at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
>  at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>  at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>  at kafka.log.Log.asyncDeleteSegment(Log.scala:1601)
>  at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653)
>  at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648)
>  at scala.collection.immutable.List.foreach(List.scala:389)
>  at kafka.log.Log.replaceSegments(Log.scala:1648)
>  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535)
>  at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462)
>  at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461)
>  at scala.collection.immutable.List.foreach(List.scala:389)
>  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Suppressed: java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/__consumer_offsets-24/.log -> 
> /tmp/kafka-logs/__consumer_offsets-24/.log.deleted
>  at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>  at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
>  at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)
>  ... 16 more
>  [2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager)
>  [2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs 
> (kafka.log.LogManager)
>  [2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in 
> /tmp/kafka-logs have failed (kafka.log.LogManager)
>  
> Please let me know what may be the issue
>  
> Partha



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6881) Kafka 1.1 Broker version crashes when deleting log

2018-05-08 Thread K B Parthasarathy (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6881?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466939#comment-16466939
 ] 

K B Parthasarathy commented on KAFKA-6881:
--

The server was never restarted after Kafka was installed. Still we got thsi 
exception and Kafka broker crashed.

> Kafka 1.1 Broker version crashes when deleting log
> --
>
> Key: KAFKA-6881
> URL: https://issues.apache.org/jira/browse/KAFKA-6881
> Project: Kafka
>  Issue Type: Bug
> Environment: Linux
>Reporter: K B Parthasarathy
>Priority: Critical
>
> Hello
> We are running Kafka 1.1 version in Linux from past 3 weeks. Today Kafka 
> crashed. When we checked server.log file the following log was found
> [2018-05-07 16:53:06,721] ERROR Failed to clean up log for 
> __consumer_offsets-24 in dir /tmp/kafka-logs due to IOException 
> (kafka.server.LogDirFailureChannel)
>  java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/__consumer_offsets-24/.log
>  at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
>  at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:409)
>  at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:697)
>  at org.apache.kafka.common.record.FileRecords.renameTo(FileRecords.java:212)
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:415)
>  at kafka.log.Log.asyncDeleteSegment(Log.scala:1601)
>  at kafka.log.Log.$anonfun$replaceSegments$1(Log.scala:1653)
>  at kafka.log.Log.$anonfun$replaceSegments$1$adapted(Log.scala:1648)
>  at scala.collection.immutable.List.foreach(List.scala:389)
>  at kafka.log.Log.replaceSegments(Log.scala:1648)
>  at kafka.log.Cleaner.cleanSegments(LogCleaner.scala:535)
>  at kafka.log.Cleaner.$anonfun$doClean$6(LogCleaner.scala:462)
>  at kafka.log.Cleaner.$anonfun$doClean$6$adapted(LogCleaner.scala:461)
>  at scala.collection.immutable.List.foreach(List.scala:389)
>  at kafka.log.Cleaner.doClean(LogCleaner.scala:461)
>  at kafka.log.Cleaner.clean(LogCleaner.scala:438)
>  at kafka.log.LogCleaner$CleanerThread.cleanOrSleep(LogCleaner.scala:305)
>  at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:291)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
>  Suppressed: java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/__consumer_offsets-24/.log -> 
> /tmp/kafka-logs/__consumer_offsets-24/.log.deleted
>  at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>  at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>  at sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:396)
>  at sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
>  at java.nio.file.Files.move(Files.java:1395)
>  at org.apache.kafka.common.utils.Utils.atomicMoveWithFallback(Utils.java:694)
>  ... 16 more
>  [2018-05-07 16:53:06,725] INFO [ReplicaManager broker=0] Stopping serving 
> replicas in dir /tmp/kafka-logs (kafka.server.ReplicaManager)
>  [2018-05-07 16:53:06,762] INFO Stopping serving logs in dir /tmp/kafka-logs 
> (kafka.log.LogManager)
>  [2018-05-07 16:53:07,032] ERROR Shutdown broker because all log dirs in 
> /tmp/kafka-logs have failed (kafka.log.LogManager)
>  
> Please let me know what may be the issue
>  
> Partha



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6788) Grouping consumer requests per consumer coordinator in admin client

2018-05-08 Thread Liju (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-6788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16466929#comment-16466929
 ] 

Liju commented on KAFKA-6788:
-

Is this still open to be worked on ? if no one else is working on this I can 
take this up

> Grouping consumer requests per consumer coordinator in admin client
> ---
>
> Key: KAFKA-6788
> URL: https://issues.apache.org/jira/browse/KAFKA-6788
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> In KafkaAdminClient, for some requests like describeGroup and deleteGroup, we 
> will first try to get the coordinator for each requested group id, and then 
> send the corresponding request for that group id. However, different group 
> ids could be hosted on the same coordinator, and these requests do support 
> multi group ids be sent within the same request. So we can consider optimize 
> it by grouping the requests per coordinator destination.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)