[GitHub] [kafka] johnthotekat edited a comment on pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.

2020-08-07 Thread GitBox


johnthotekat edited a comment on pull request #9133:
URL: https://github.com/apache/kafka/pull/9133#issuecomment-670823191


   Closing this PR. Raised another PR https://github.com/apache/kafka/pull/9146 
.
   This branch got a little messed up during the rebase :( 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] johnthotekat commented on pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.

2020-08-07 Thread GitBox


johnthotekat commented on pull request #9133:
URL: https://github.com/apache/kafka/pull/9133#issuecomment-670823191


   This branch got a little messed up during the rebase :( 
   Raised another PR https://github.com/apache/kafka/pull/9146 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] johnthotekat closed pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.

2020-08-07 Thread GitBox


johnthotekat closed pull request #9133:
URL: https://github.com/apache/kafka/pull/9133


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] johnthotekat opened a new pull request #9146: KAFKA-10316 Updated Kafka Streams upgrade-guide.html

2020-08-07 Thread GitBox


johnthotekat opened a new pull request #9146:
URL: https://github.com/apache/kafka/pull/9146


   * This PR updates the upgrade guide for the changes in 
https://github.com/apache/kafka/pull/9120.
   * Added the details on KIP-648 to 2.7.0 upgrade notes - 
docs/streams/upgrade-guide.html.
   @mjsax 
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-08-07 Thread GitBox


mjsax commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r467291346



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -93,9 +90,7 @@ public boolean isActive() {
 public void initializeIfNeeded() {
 if (state() == State.CREATED) {
 StateManagerUtil.registerStateStores(log, logPrefix, topology, 
stateMgr, stateDirectory, processorContext);
-
-// initialize the snapshot with the current offsets as we don't 
need to commit then until they change
-offsetSnapshotSinceLastCommit = new 
HashMap<>(stateMgr.changelogOffsets());
+initializeCheckpoint();

Review comment:
   In the old code, we actually get a copy of the `Map`, while within 
`initializeCheckpoint();` don't -- is this on purpose? It it safe?
   
   Also, do we actually need the method? The old code was just doing the exact 
some thing? It's just one-liner method -- what do we gain?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -49,6 +59,30 @@
 this.stateDirectory = stateDirectory;
 }
 
+protected void initializeCheckpoint() {
+// we will delete the local checkpoint file after registering the 
state stores and loading them into the
+// state manager, therefore we should initialize the snapshot as empty 
to indicate over-write checkpoint needed

Review comment:
   Seems the comment is outdated? `we should initialize the snapshot as 
empty`

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1638,21 +1689,22 @@ public void 
shouldCheckpointOffsetsOnPostCommitIfCommitNeeded() {
 
 task.suspend();
 task.prepareCommit();
-task.postCommit();
+task.postCommit(false);
 
 assertEquals(Task.State.SUSPENDED, task.state());
 
 EasyMock.verify(stateManager);
 }
 
 @Test
-public void shouldSwallowExceptionOnCloseCleanError() {
+public void shouldThrowExceptionOnCloseCleanError() {
 final long offset = 543L;
 
 
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
-
stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(partition1, 
offset)));
-EasyMock.expectLastCall();
+stateManager.checkpoint();
+EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint 
should not be called")).anyTimes();

Review comment:
   as above? (more below... won't add comments each time)

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
##
@@ -412,7 +412,7 @@ public void shouldInitializeOffsetsFromCheckpointFile() 
throws IOException {
 stateMgr.registerStore(nonPersistentStore, 
nonPersistentStore.stateRestoreCallback);
 stateMgr.initializeStoreOffsetsFromCheckpoint(true);
 
-assertFalse(checkpointFile.exists());
+assertTrue(checkpointFile.exists());

Review comment:
   Should we add a test for EOS, that the checkpoint file is deleted?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -1259,13 +1262,47 @@ public void shouldReInitializeTopologyWhenResuming() 
throws IOException {
 }
 
 @Test
-public void shouldCheckpointOffsetsOnCommit() {
+public void 
shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
 final Long offset = 543L;
 
 
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.singletonMap(changelogPartition,
 offset)).anyTimes();
-
stateManager.checkpoint(EasyMock.eq(Collections.singletonMap(changelogPartition,
 offset)));
+stateManager.checkpoint();
+EasyMock.expectLastCall().once();
+
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.singleton(changelogPartition));
+EasyMock.expect(stateManager.changelogOffsets())
+.andReturn(Collections.singletonMap(changelogPartition, 0L))
+.andReturn(Collections.singletonMap(changelogPartition, 10L))
+.andReturn(Collections.singletonMap(changelogPartition, 20L));
+stateManager.registerStore(stateStore, 
stateStore.stateRestoreCallback);
 EasyMock.expectLastCall();
+EasyMock.replay(stateManager, recordCollector);
+
+task = createStatefulTask(createConfig(false, "100"), true);
+
+task.initializeIfNeeded();
+task.completeRestoration();
+
+task.prepareCommit();
+task.postCommit(true);
+
+task.prepareCommit();
+task.postCommit(false);
+
+EasyMock.verify(recordCollector);

Review comment:
   Should we verify `stateManager`, too?

##
File path: 

[GitHub] [kafka] ning2008wisc commented on pull request #9145: KAFKA-10370: rewind consumer to SinkTaskContext's offsets when init

2020-08-07 Thread GitBox


ning2008wisc commented on pull request #9145:
URL: https://github.com/apache/kafka/pull/9145#issuecomment-670820575


   As mentioned in 
https://stackoverflow.com/questions/54480715/no-current-assignment-for-partition-occurs-even-after-poll-in-kafka,
 another potential resolution could be "call consumer.seek(tp, offsets)" in 
**onPartitionsAssigned() callback** after subscribing



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ning2008wisc opened a new pull request #9145: KAFKA-10370: rewind consumer to SinkTaskContext's offsets when init

2020-08-07 Thread GitBox


ning2008wisc opened a new pull request #9145:
URL: https://github.com/apache/kafka/pull/9145


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-07 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Description: 
In WorkerSinkTask.java, when we want the consumer to consume from certain 
offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 provided a way to supply the offsets from external (e.g. implementation of 
SinkTask) to rewind customer. 

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
if the offsets are not empty, (2) consumer.seek(tp, offset) to rewind the 
consumer.

when SinkTask first initializes (+start(Map props)+), we do 
+"context.offset(offsets);+" , then in above step (2), we saw the following 
IllegalStateException:

{code:java}
[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind 
test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
[2020-08-07 23:53:55,752] INFO [Consumer 
clientId=connector-consumer-MirrorSinkConnector-0, 
groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
(org.apache.kafka.clients.consumer.KafkaConsumer:1592)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.IllegalStateException: No current assignment for partition test-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:188)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*, in this case.

  was:
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind 
test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
[2020-08-07 23:53:55,752] INFO [Consumer 
clientId=connector-consumer-MirrorSinkConnector-0, 
groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
(org.apache.kafka.clients.consumer.KafkaConsumer:1592)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.IllegalStateException: No current assignment for partition test-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at 

[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-07 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Description: 
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
[2020-08-07 23:53:55,752] INFO WorkerSinkTask{id=MirrorSinkConnector-0} Rewind 
test-1 to offset 3 (org.apache.kafka.connect.runtime.WorkerSinkTask:648)
[2020-08-07 23:53:55,752] INFO [Consumer 
clientId=connector-consumer-MirrorSinkConnector-0, 
groupId=connect-MirrorSinkConnector] Seeking to offset 3 for partition test-1 
(org.apache.kafka.clients.consumer.KafkaConsumer:1592)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
threw an uncaught and unrecoverable exception 
(org.apache.kafka.connect.runtime.WorkerTask:187)
java.lang.IllegalStateException: No current assignment for partition test-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:368)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seekUnvalidated(SubscriptionState.java:385)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1597)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.rewind(WorkerSinkTask.java:649)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at 
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-08-07 23:53:55,752] ERROR WorkerSinkTask{id=MirrorSinkConnector-0} Task 
is being killed and will not recover until manually restarted 
(org.apache.kafka.connect.runtime.WorkerTask:188)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*. 

  was:
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*. 


> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> 

[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-07 Thread GitBox


cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467331218



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -88,15 +88,17 @@ class SocketServer(val config: KafkaConfig,
   private val memoryPoolDepletedTimeMetricName = 
metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, 
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
-  // data-plane
-  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
-  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneMetricPrefix, time)
   // control-plane
   private var controlPlaneProcessorOpt : Option[Processor] = None
   private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
   val controlPlaneRequestChannelOpt: Option[RequestChannel] = 
config.controlPlaneListenerName.map(_ =>
-new RequestChannel(20, ControlPlaneMetricPrefix, time))
+new RequestChannel(20, ControlPlaneMetricPrefix, time, true))
+  // data-plane
+  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
Acceptor]()
+  // If the control plane processor is not defined, just set the flag to true 
in data plane to bypass the check for whether a given
+  // request is from the control plane or not.

Review comment:
   I don't think this is quite right.  The flag should be set based on 
whether we're in the control plane listener OR the inter-broker listener.  It's 
much more common to use a separate inter-broker listener, than to use a fully 
separate control plane listener.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-07 Thread GitBox


cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467331218



##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -88,15 +88,17 @@ class SocketServer(val config: KafkaConfig,
   private val memoryPoolDepletedTimeMetricName = 
metrics.metricName("MemoryPoolDepletedTimeTotal", MetricsGroup)
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, 
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
-  // data-plane
-  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
-  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
Acceptor]()
-  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, 
DataPlaneMetricPrefix, time)
   // control-plane
   private var controlPlaneProcessorOpt : Option[Processor] = None
   private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
   val controlPlaneRequestChannelOpt: Option[RequestChannel] = 
config.controlPlaneListenerName.map(_ =>
-new RequestChannel(20, ControlPlaneMetricPrefix, time))
+new RequestChannel(20, ControlPlaneMetricPrefix, time, true))
+  // data-plane
+  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
Acceptor]()
+  // If the control plane processor is not defined, just set the flag to true 
in data plane to bypass the check for whether a given
+  // request is from the control plane or not.

Review comment:
   I don't think this is quite right.  The flag should be set based on 
whether we're in the control plane listener OR the inter-broker listener.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-07 Thread GitBox


cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467330968



##
File path: core/src/main/scala/kafka/network/RequestChannel.scala
##
@@ -309,7 +309,10 @@ object RequestChannel extends Logging {
   }
 }
 
-class RequestChannel(val queueSize: Int, val metricNamePrefix : String, time: 
Time) extends KafkaMetricsGroup {
+class RequestChannel(val queueSize: Int,
+ val metricNamePrefix : String,
+ time: Time,
+ val maybeFromControlPlane: Boolean) extends 
KafkaMetricsGroup {

Review comment:
   Instead of `maybeFromControlPlane`, how about `fromControlPlaneListener`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-07 Thread GitBox


cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467330705



##
File path: 
clients/src/main/java/org/apache/kafka/server/authorizer/AuthorizableRequestContext.java
##
@@ -69,4 +69,11 @@
  * Returns the correlation id from the request header.
  */
 int correlationId();
+
+/**
+ * Returns the initial principal name for a forwarded request.
+ */
+default String initialPrincipalName() {

Review comment:
   Does this need to be here?  I'm concerned that having it here will 
eventually lead to us using it for authorization, which it shouldn't be.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-07 Thread GitBox


cmccabe commented on a change in pull request #9144:
URL: https://github.com/apache/kafka/pull/9144#discussion_r467330529



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
##
@@ -37,11 +37,22 @@ public RequestHeader(Struct struct, short headerVersion) {
 }
 
 public RequestHeader(ApiKeys requestApiKey, short requestVersion, String 
clientId, int correlationId) {
-this(new RequestHeaderData().
-setRequestApiKey(requestApiKey.id).
-setRequestApiVersion(requestVersion).
-setClientId(clientId).
-setCorrelationId(correlationId),
+this(requestApiKey, requestVersion, clientId, correlationId, null, 
null);
+}
+
+public RequestHeader(ApiKeys requestApiKey,

Review comment:
   I think it would be good to have a constructor that didn't have the two 
forwarding fields, to avoid changing a large number of tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #9144: KAFKA-9705: (part-1) add redirection fields in the request header

2020-08-07 Thread GitBox


abbccdda opened a new pull request #9144:
URL: https://github.com/apache/kafka/pull/9144


   Add the redirection supporting fields, including:
   
   1. initial principal name
   2. initial client id
   3. the flag to indicate whether a given request is coming from the control 
plane in a secured environment.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10371) Partition reassignments can result in crashed ReplicaFetcherThreads.

2020-08-07 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10371:

Affects Version/s: (was: 2.7.0)

> Partition reassignments can result in crashed ReplicaFetcherThreads.
> 
>
> Key: KAFKA-10371
> URL: https://issues.apache.org/jira/browse/KAFKA-10371
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Steve Rodrigues
>Assignee: David Jacot
>Priority: Critical
>
> A Kafka system doing partition reassignments got stuck with the reassignment 
> partially done and the system with a non-zero number of URPs and increasing 
> max lag.
> Looking in the logs, we see: 
> {noformat}
> [ERROR] 2020-07-31 21:22:23,984 [ReplicaFetcherThread-0-3] 
> kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, 
> fetcherId=0] Error due to
> org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while 
> fetching partition state for foo
> [INFO] 2020-07-31 21:22:23,986 [ReplicaFetcherThread-0-3] 
> kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, 
> fetcherId=0] Stopped
> {noformat}
> Investigating further and with some helpful changes to the exception (which 
> was not generating a stack trace because it was a client-side exception), we 
> see on a test run:
> {noformat}
> [2020-08-06 19:58:21,592] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while 
> fetching partition state for topic-test-topic-85
> at org.apache.kafka.common.protocol.Errors.exception(Errors.java:415)
> at 
> kafka.server.ReplicaManager.getPartitionOrException(ReplicaManager.scala:645)
> at 
> kafka.server.ReplicaManager.localLogOrException(ReplicaManager.scala:672)
> at 
> kafka.server.ReplicaFetcherThread.logStartOffset(ReplicaFetcherThread.scala:133)
> at 
> kafka.server.ReplicaFetcherThread.$anonfun$buildFetch$1(ReplicaFetcherThread.scala:316)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
> at 
> kafka.server.ReplicaFetcherThread.buildFetch(ReplicaFetcherThread.scala:309)
> {noformat}
> It appears that the fetcher is attempting to fetch for a partition that has 
> been getting reassigned away. From further investigation, it seems that in 
> KAFKA-10002 the StopReplica code was changed from:
> 1. Remove partition from fetcher
> 2. Remove partition from partition map
> to the other way around, but now the fetcher may race and attempt to build a 
> fetch for a partition that's no longer mapped.  In particular, since the 
> logOrException code is being called from logStartOffset which isn't protected 
> against NotLeaderOrFollowerException, just against KafkaStorageException, the 
> exception isn't caught and throws all the way out, killing the replica 
> fetcher thread.
> We need to switch this back.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10371) Partition reassignments can result in crashed ReplicaFetcherThreads.

2020-08-07 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10371.
-
Resolution: Fixed

> Partition reassignments can result in crashed ReplicaFetcherThreads.
> 
>
> Key: KAFKA-10371
> URL: https://issues.apache.org/jira/browse/KAFKA-10371
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Steve Rodrigues
>Assignee: David Jacot
>Priority: Critical
>
> A Kafka system doing partition reassignments got stuck with the reassignment 
> partially done and the system with a non-zero number of URPs and increasing 
> max lag.
> Looking in the logs, we see: 
> {noformat}
> [ERROR] 2020-07-31 21:22:23,984 [ReplicaFetcherThread-0-3] 
> kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, 
> fetcherId=0] Error due to
> org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while 
> fetching partition state for foo
> [INFO] 2020-07-31 21:22:23,986 [ReplicaFetcherThread-0-3] 
> kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, 
> fetcherId=0] Stopped
> {noformat}
> Investigating further and with some helpful changes to the exception (which 
> was not generating a stack trace because it was a client-side exception), we 
> see on a test run:
> {noformat}
> [2020-08-06 19:58:21,592] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while 
> fetching partition state for topic-test-topic-85
> at org.apache.kafka.common.protocol.Errors.exception(Errors.java:415)
> at 
> kafka.server.ReplicaManager.getPartitionOrException(ReplicaManager.scala:645)
> at 
> kafka.server.ReplicaManager.localLogOrException(ReplicaManager.scala:672)
> at 
> kafka.server.ReplicaFetcherThread.logStartOffset(ReplicaFetcherThread.scala:133)
> at 
> kafka.server.ReplicaFetcherThread.$anonfun$buildFetch$1(ReplicaFetcherThread.scala:316)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
> at 
> kafka.server.ReplicaFetcherThread.buildFetch(ReplicaFetcherThread.scala:309)
> {noformat}
> It appears that the fetcher is attempting to fetch for a partition that has 
> been getting reassigned away. From further investigation, it seems that in 
> KAFKA-10002 the StopReplica code was changed from:
> 1. Remove partition from fetcher
> 2. Remove partition from partition map
> to the other way around, but now the fetcher may race and attempt to build a 
> fetch for a partition that's no longer mapped.  In particular, since the 
> logOrException code is being called from logStartOffset which isn't protected 
> against NotLeaderOrFollowerException, just against KafkaStorageException, the 
> exception isn't caught and throws all the way out, killing the replica 
> fetcher thread.
> We need to switch this back.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #9140: KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads

2020-08-07 Thread GitBox


hachikuji merged pull request #9140:
URL: https://github.com/apache/kafka/pull/9140


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

2020-08-07 Thread GitBox


mjsax commented on pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#issuecomment-670745736


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soondenana commented on pull request #9140: KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads

2020-08-07 Thread GitBox


soondenana commented on pull request #9140:
URL: https://github.com/apache/kafka/pull/9140#issuecomment-670730240


   I can confirm that this fixes the issue. I ran system test 18 times and 
didn't hit this issue. Without this fix, when I run the system test 10 times I 
hit the issue 2 times.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-07 Thread GitBox


ableegoldman commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r467296111



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##
@@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception {
 )));
 }
 
+
+
+@Test
+public void shouldReduceSlidingWindows() throws Exception {
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
+final long firstBatchTimestamp = 2000L;
+final long timeDifference = 1000L;
+produceMessages(firstBatchTimestamp);
+final long secondBatchTimestamp = firstBatchTimestamp + timeDifference 
/ 2;
+produceMessages(secondBatchTimestamp);
+final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference 
- 100L;
+produceMessages(thirdBatchTimestamp);
+
+final Serde> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class);
+groupedStream
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
+.reduce(reducer)
+.toStream()
+.to(outputTopic, Produced.with(windowedSerde, 
Serdes.String()));
+
+startStreams();
+
+final List, String>> windowedOutput 
= receiveMessages(
+new TimeWindowedDeserializer<>(),
+new StringDeserializer(),
+String.class,
+25);
+
+// read from ConsoleConsumer
+final String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
+new TimeWindowedDeserializer(),
+new StringDeserializer(),
+String.class,
+25,
+true);
+
+final Comparator, String>> 
comparator =
+Comparator.comparing((KeyValueTimestamp, 
String> o) -> o.key().key())
+.thenComparing(KeyValueTimestamp::value);
+
+windowedOutput.sort(comparator);
+final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference;
+final long firstBatchRightWindow = firstBatchTimestamp + 1;
+final long secondBatchLeftWindow = secondBatchTimestamp - 
timeDifference;
+final long secondBatchRightWindow = secondBatchTimestamp + 1;
+final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference;
+
+final List, String>> expectResult = 
Arrays.asList(
+new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp),

Review comment:
   Clearly Kafka Streams is superior to the plain Consumer  





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-08-07 Thread GitBox


mjsax commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r466716421



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -49,6 +61,30 @@
 this.stateDirectory = stateDirectory;
 }
 
+protected void initializeCheckpoint() {
+// we will delete the local checkpoint file after registering the 
state stores and loading them into the

Review comment:
   Independent of this PR, might it be worth to change this and close this 
window of risk?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 commented on a change in pull request #9142: MINOR: Fix delete_topic for system tests

2020-08-07 Thread GitBox


skaundinya15 commented on a change in pull request #9142:
URL: https://github.com/apache/kafka/pull/9142#discussion_r467274037



##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -503,7 +503,7 @@ def create_topic(self, topic_cfg, node=None, 
use_zk_to_create_topic=True):
 self.logger.info("Running topic creation command...\n%s" % cmd)
 node.account.ssh(cmd)
 
-def delete_topic(self, topic, node=None):
+def delete_topic(self, topic, node=None, use_zk_to_delete_topic=False):

Review comment:
   Yeah that's a good point, but in this scenario I was thinking to remain 
consistent with the original implementation. Also I'm not sure how far we want 
to backport this, but if we backport it past the version where 
`--bootstrap-server` is not available, then I think we would have to rewrite 
this function to account for that. Also since we are on our way to deprecate 
and remove ZK, this would be a good way to start pushing us towards doing that.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr commented on a change in pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-07 Thread GitBox


dielhennr commented on a change in pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#discussion_r467273920



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -504,6 +515,7 @@ public void onSuccess(ByteBuffer value) {
 log.info("Successfully joined group with 
generation {}", generation.generationId);
 state = MemberState.STABLE;
 rejoinNeeded = false;
+rebalanceConfig.coordinatorUpdated();

Review comment:
   The latest commit tries to address this. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9142: MINOR: Fix delete_topic for system tests

2020-08-07 Thread GitBox


rondagostino commented on a change in pull request #9142:
URL: https://github.com/apache/kafka/pull/9142#discussion_r467263885



##
File path: tests/kafkatest/services/kafka/kafka.py
##
@@ -503,7 +503,7 @@ def create_topic(self, topic_cfg, node=None, 
use_zk_to_create_topic=True):
 self.logger.info("Running topic creation command...\n%s" % cmd)
 node.account.ssh(cmd)
 
-def delete_topic(self, topic, node=None):
+def delete_topic(self, topic, node=None, use_zk_to_delete_topic=False):

Review comment:
   create/describe/list use `use_zk_to_..._topic=True` so I'm wondering if 
it is best to remain consistent here.  For example, if running an older 
version, might `--bootstrap-server` not be available?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-07 Thread GitBox


lct45 commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r467260318



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamSlidingWindowAggregate.java
##
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.Window;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.kstream.SlidingWindows;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.time.Instant;
+import java.util.HashSet;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrLateRecordDropSensor;
+import static 
org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor;
+import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;
+
+public class KStreamSlidingWindowAggregate implements 
KStreamAggProcessorSupplier, V, Agg> {
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+private final String storeName;
+private final SlidingWindows windows;
+private final Initializer initializer;
+private final Aggregator aggregator;
+
+private boolean sendOldValues = false;
+
+public KStreamSlidingWindowAggregate(final SlidingWindows windows,
+  final String storeName,
+  final Initializer initializer,
+  final Aggregator aggregator) {
+this.windows = windows;
+this.storeName = storeName;
+this.initializer = initializer;
+this.aggregator = aggregator;
+}
+
+@Override
+public Processor get() {
+return new KStreamSlidingWindowAggregateProcessor();
+}
+
+public SlidingWindows windows() {
+return windows;
+}
+
+@Override
+public void enableSendingOldValues() {
+sendOldValues = true;
+}
+
+private class KStreamSlidingWindowAggregateProcessor extends 
AbstractProcessor {
+private TimestampedWindowStore windowStore;
+private TimestampedTupleForwarder, Agg> tupleForwarder;
+private StreamsMetricsImpl metrics;
+private InternalProcessorContext internalProcessorContext;
+private Sensor lateRecordDropSensor;
+private Sensor droppedRecordsSensor;
+private long observedStreamTime = ConsumerRecord.NO_TIMESTAMP;
+private boolean reverseIteratorImplemented = false;
+
+@SuppressWarnings("unchecked")
+@Override
+public void init(final ProcessorContext context) {
+super.init(context);
+internalProcessorContext = (InternalProcessorContext) context;
+metrics = internalProcessorContext.metrics();
+final String threadId = Thread.currentThread().getName();
+lateRecordDropSensor = droppedRecordsSensorOrLateRecordDropSensor(
+threadId,
+context.taskId().toString(),
+internalProcessorContext.currentNode().name(),
+metrics
+);
+//catch unsupported operation error
+droppedRecordsSensor = 
droppedRecordsSensorOrSkippedRecordsSensor(threadId, 
context.taskId().toString(), metrics);
+   

[GitHub] [kafka] cmccabe commented on pull request #9103: Add redirection for (Incremental)AlterConfig

2020-08-07 Thread GitBox


cmccabe commented on pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#issuecomment-670693346


   I think it would be good to split this PR up a little bit.  It seems like we 
could have a split like this:
   
   PR 1.  `Add flag to the RequestContext` and `Add initial principal name`
   
   PR 2. Authorization changes for AlterConfigs / IncrementalAlterConfigs, 
forwarding if required, IBP check, bump RPC versions of AlterConfigs / 
IncrementalAlterConfigs
   
   PR 3. AdminClient changes in behavior based on versions of AlterConfigs / 
IncrementalAlterConfigs, AlterConfigsUtil, etc.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.

2020-08-07 Thread GitBox


mjsax commented on a change in pull request #9133:
URL: https://github.com/apache/kafka/pull/9133#discussion_r467244495



##
File path: streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
##
@@ -50,32 +50,65 @@ public KeyQueryMetadata(final HostInfo activeHost, final 
Set standbyHo
 }
 
 /**
- * Get the Active streams instance for given key

Review comment:
   Not sure. I am not an git guru... Can you maybe just rebase this PR 
against latest trunk (just to make sure we don't mess anything up?)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #9028: KAFKA-10035: Safer conversion of consumer timeout parameters

2020-08-07 Thread GitBox


mjsax commented on pull request #9028:
URL: https://github.com/apache/kafka/pull/9028#issuecomment-670689958


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] johnthotekat commented on a change in pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.

2020-08-07 Thread GitBox


johnthotekat commented on a change in pull request #9133:
URL: https://github.com/apache/kafka/pull/9133#discussion_r467236707



##
File path: streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
##
@@ -50,32 +50,65 @@ public KeyQueryMetadata(final HostInfo activeHost, final 
Set standbyHo
 }
 
 /**
- * Get the Active streams instance for given key

Review comment:
   If you see from the commit history in the timeline . My last commit is : 
https://github.com/apache/kafka/pull/9133/commits/14ab0529a492e9b1d361b1ab3081b8fa7189632f
 (this has only the html changes) and all the ones before that are already 
merged to trunk. Am I missing something here for the previous changes to show 
up in this PR too?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] johnthotekat commented on a change in pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.

2020-08-07 Thread GitBox


johnthotekat commented on a change in pull request #9133:
URL: https://github.com/apache/kafka/pull/9133#discussion_r467236707



##
File path: streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
##
@@ -50,32 +50,65 @@ public KeyQueryMetadata(final HostInfo activeHost, final 
Set standbyHo
 }
 
 /**
- * Get the Active streams instance for given key

Review comment:
   If you see from the commit history in the timeline . My last commit is : 
https://github.com/apache/kafka/pull/9133/commits/14ab0529a492e9b1d361b1ab3081b8fa7189632f
 (this has only the html changes) and all the ones before that are already 
merged to trunk. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on a change in pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

2020-08-07 Thread GitBox


lct45 commented on a change in pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#discussion_r467236081



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##
@@ -47,18 +47,96 @@
 CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
 this.builder = builder;
 }
-
- KTable build(final 
Map, Aggregator> 
groupPatterns,
+ KTable buildNotWindowed(final 
Map, Aggregator> 
groupPatterns,
final Initializer 
initializer,
final NamedInternal 
named,
final StoreBuilder 
storeBuilder,
final Serde 
keySerde,
final Serde 
valueSerde,
-   final String 
queryableName,
-   final Windows 
windows,
-   final SessionWindows 
sessionWindows,
-   final Merger sessionMerger) {
+   final String 
queryableName) {
+build(groupPatterns, storeBuilder);
+final Collection processors = new ArrayList<>();
+boolean stateCreated = false;
+int counter = 0;
+for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) {
+final StatefulProcessorNode statefulProcessorNode = 
getStatefulProcessorNode(
+initializer,
+named.suffixWithOrElseGet(
+"-cogroup-agg-" + counter++,
+builder,
+CogroupedKStreamImpl.AGGREGATE_NAME),
+stateCreated,
+storeBuilder,
+new KStreamAggregate<>(storeBuilder.name(), initializer, 
kGroupedStream.getValue()));
+stateCreated = true;
+processors.add(statefulProcessorNode);
+builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
+}
+return createTable(processors, named, keySerde, valueSerde, 
queryableName);
+}
+
+ KTable buildTimeWindows(final 
Map, Aggregator> 
groupPatterns,
+  final 
Initializer initializer,
+  final 
NamedInternal named,
+  final 
StoreBuilder storeBuilder,
+  final Serde 
keySerde,
+  final 
Serde valueSerde,
+  final String 
queryableName,
+  final 
Windows windows) {
+build(groupPatterns, storeBuilder);
 
+final Collection processors = new ArrayList<>();
+boolean stateCreated = false;
+int counter = 0;
+for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) {
+final StatefulProcessorNode statefulProcessorNode = 
getStatefulProcessorNode(
+initializer,
+named.suffixWithOrElseGet(
+"-cogroup-agg-" + counter++,
+builder,
+CogroupedKStreamImpl.AGGREGATE_NAME),
+stateCreated,
+storeBuilder,
+new KStreamWindowAggregate<>(windows, storeBuilder.name(), 
initializer, kGroupedStream.getValue()));
+stateCreated = true;
+processors.add(statefulProcessorNode);
+builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
+}
+return createTable(processors, named, keySerde, valueSerde, 
queryableName);
+}
+
+ KTable buildSessionWindows(final 
Map, Aggregator> 
groupPatterns,
+  final 
Initializer initializer,
+  final 
NamedInternal named,
+  final 
StoreBuilder storeBuilder,
+  final 
Serde keySerde,
+  final 
Serde valueSerde,
+  final String 
queryableName,
+  final 
SessionWindows sessionWindows,
+   

[GitHub] [kafka] johnthotekat commented on a change in pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.

2020-08-07 Thread GitBox


johnthotekat commented on a change in pull request #9133:
URL: https://github.com/apache/kafka/pull/9133#discussion_r467235567



##
File path: streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
##
@@ -50,32 +50,65 @@ public KeyQueryMetadata(final HostInfo activeHost, final 
Set standbyHo
 }
 
 /**
- * Get the Active streams instance for given key

Review comment:
   Yeah , this should only update the html file. I raised the PR from the 
same branch. Err., I wonder, as these changes were already merged, should it 
even be showing these as changes here ? I'm saying about the previous changes 
which were already merged.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9133: KAFKA-10316: Updated Kafka Streams upgrade notes.

2020-08-07 Thread GitBox


mjsax commented on a change in pull request #9133:
URL: https://github.com/apache/kafka/pull/9133#discussion_r467233225



##
File path: streams/src/main/java/org/apache/kafka/streams/KeyQueryMetadata.java
##
@@ -50,32 +50,65 @@ public KeyQueryMetadata(final HostInfo activeHost, final 
Set standbyHo
 }
 
 /**
- * Get the Active streams instance for given key

Review comment:
   As the original PR is merged already, this seems to be redundant 
(similar below for all other code changes). -- This PR should only update the 
html file?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9039: KAFKA-5636: SlidingWindows (KIP-450)

2020-08-07 Thread GitBox


mjsax commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r467231847



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
##
@@ -459,6 +460,95 @@ public void shouldGroupByKey() throws Exception {
 )));
 }
 
+
+
+@Test
+public void shouldReduceSlidingWindows() throws Exception {
+streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0);
+final long firstBatchTimestamp = 2000L;
+final long timeDifference = 1000L;
+produceMessages(firstBatchTimestamp);
+final long secondBatchTimestamp = firstBatchTimestamp + timeDifference 
/ 2;
+produceMessages(secondBatchTimestamp);
+final long thirdBatchTimestamp = secondBatchTimestamp + timeDifference 
- 100L;
+produceMessages(thirdBatchTimestamp);
+
+final Serde> windowedSerde = 
WindowedSerdes.timeWindowedSerdeFrom(String.class);
+groupedStream
+
.windowedBy(SlidingWindows.withTimeDifferenceAndGrace(ofMillis(timeDifference), 
ofMillis(2000L)))
+.reduce(reducer)
+.toStream()
+.to(outputTopic, Produced.with(windowedSerde, 
Serdes.String()));
+
+startStreams();
+
+final List, String>> windowedOutput 
= receiveMessages(
+new TimeWindowedDeserializer<>(),
+new StringDeserializer(),
+String.class,
+25);
+
+// read from ConsoleConsumer
+final String resultFromConsoleConsumer = 
readWindowedKeyedMessagesViaConsoleConsumer(
+new TimeWindowedDeserializer(),
+new StringDeserializer(),
+String.class,
+25,
+true);
+
+final Comparator, String>> 
comparator =
+Comparator.comparing((KeyValueTimestamp, 
String> o) -> o.key().key())
+.thenComparing(KeyValueTimestamp::value);
+
+windowedOutput.sort(comparator);
+final long firstBatchLeftWindow = firstBatchTimestamp - timeDifference;
+final long firstBatchRightWindow = firstBatchTimestamp + 1;
+final long secondBatchLeftWindow = secondBatchTimestamp - 
timeDifference;
+final long secondBatchRightWindow = secondBatchTimestamp + 1;
+final long thirdBatchLeftWindow = thirdBatchTimestamp - timeDifference;
+
+final List, String>> expectResult = 
Arrays.asList(
+new KeyValueTimestamp<>(new Windowed<>("A", new 
TimeWindow(firstBatchLeftWindow, Long.MAX_VALUE)), "A", firstBatchTimestamp),

Review comment:
   Thanks. I missed the point that this trick to pass in the windowSize 
only works for KafkaStreams when we pass in `Serdes` object that are used as 
provided...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 opened a new pull request #9143: MINOR: Fix the way total consumed is calculated for verifiable consumer

2020-08-07 Thread GitBox


skaundinya15 opened a new pull request #9143:
URL: https://github.com/apache/kafka/pull/9143


   Currently the way we calculate the number of total consumed messages for the 
verifiable consumer overcounts the number of actually consumed messages. This 
PR is to fix that to ensure we count the number of consumed messages correctly.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 opened a new pull request #9142: MINOR: fixing delete topic for system tests

2020-08-07 Thread GitBox


skaundinya15 opened a new pull request #9142:
URL: https://github.com/apache/kafka/pull/9142


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   The current implementation of `delete_topic` in `kafka.py` doesn't take into 
account security credentials and thus fails when running tests with security 
enabled and trying to delete a topic. This PR is to fix this issue.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

2020-08-07 Thread GitBox


ableegoldman commented on a change in pull request #9141:
URL: https://github.com/apache/kafka/pull/9141#discussion_r467210400



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##
@@ -47,18 +47,96 @@
 CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
 this.builder = builder;
 }
-
- KTable build(final 
Map, Aggregator> 
groupPatterns,
+ KTable buildNotWindowed(final 
Map, Aggregator> 
groupPatterns,

Review comment:
   WDYT about naming this just `build`? It's not as clear, but I think it's 
in line with the naming conventions elsewhere. For example we have 
`KStreamWindowAggregate` and `KStreamAggregate` (then maybe we can come up with 
a more descriptive name for the method currently called `build`

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##
@@ -85,68 +163,39 @@
 groupedStreams.remove(kGrouped);
 kGrouped.ensureCopartitionWith(groupedStreams);
 
-final Collection processors = new ArrayList<>();
-boolean stateCreated = false;
-int counter = 0;
-for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) {
-final StatefulProcessorNode statefulProcessorNode = 
getStatefulProcessorNode(
-kGroupedStream.getValue(),
-initializer,
-named.suffixWithOrElseGet(
-"-cogroup-agg-" + counter++,
-builder,
-CogroupedKStreamImpl.AGGREGATE_NAME),
-stateCreated,
-storeBuilder,
-windows,
-sessionWindows,
-sessionMerger);
-stateCreated = true;
-processors.add(statefulProcessorNode);
-builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), 
statefulProcessorNode);
-}
+}
+
+ KTable createTable(final 
Collection processors,
+ final 
NamedInternal named,
+ final Serde 
keySerde,
+ final Serde 
valueSerde,
+ final String 
queryableName) {
 final String mergeProcessorName = named.suffixWithOrElseGet(
-"-cogroup-merge",
-builder,
-CogroupedKStreamImpl.MERGE_NAME);
+"-cogroup-merge",
+builder,
+CogroupedKStreamImpl.MERGE_NAME);
 final ProcessorSupplier passThrough = new PassThrough<>();
 final ProcessorGraphNode mergeNode =
-new ProcessorGraphNode<>(mergeProcessorName, new 
ProcessorParameters<>(passThrough, mergeProcessorName));
+new ProcessorGraphNode<>(mergeProcessorName, new 
ProcessorParameters<>(passThrough, mergeProcessorName));
 
 builder.addGraphNode(processors, mergeNode);
 
 return new KTableImpl(
-mergeProcessorName,
-keySerde,
-valueSerde,
-Collections.singleton(mergeNode.nodeName()),
-queryableName,
-passThrough,
-mergeNode,
-builder);
+mergeProcessorName,
+keySerde,
+valueSerde,
+Collections.singleton(mergeNode.nodeName()),
+queryableName,
+passThrough,
+mergeNode,
+builder);
 }
 
-private  StatefulProcessorNode 
getStatefulProcessorNode(final Aggregator aggregator,
-   
 final Initializer initializer,
+private  StatefulProcessorNode 
getStatefulProcessorNode(final Initializer initializer,

Review comment:
   We can remove the `` now, right? Also it looks like 
the `initializer` input isn't needed anymore either

##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##
@@ -85,68 +163,39 @@
 groupedStreams.remove(kGrouped);
 kGrouped.ensureCopartitionWith(groupedStreams);
 
-final Collection processors = new ArrayList<>();
-boolean stateCreated = false;
-int counter = 0;
-for (final Entry, Aggregator> kGroupedStream : groupPatterns.entrySet()) {
-final StatefulProcessorNode statefulProcessorNode = 
getStatefulProcessorNode(
-kGroupedStream.getValue(),
-initializer,
-named.suffixWithOrElseGet(
-"-cogroup-agg-" + counter++,
-builder,
-CogroupedKStreamImpl.AGGREGATE_NAME),
-stateCreated,
-storeBuilder,
-   

[GitHub] [kafka] abbccdda commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

2020-08-07 Thread GitBox


abbccdda commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r467215647



##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -497,10 +497,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def alterConfigsRequest =
 new AlterConfigsRequest.Builder(
-  Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, 
tp.topic),
-new AlterConfigsRequest.Config(Collections.singleton(
-  new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, 
"100")
-))), true).build()
+Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, 
tp.topic),
+  new AlterConfigsRequest.Config(Collections.singleton(
+new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, 
"100")
+  ))), true).build()

Review comment:
   Let me check around.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9103: Add redirection for (Incremental)AlterConfig

2020-08-07 Thread GitBox


cmccabe commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r467214507



##
File path: 
core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
##
@@ -497,10 +497,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def alterConfigsRequest =
 new AlterConfigsRequest.Builder(
-  Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, 
tp.topic),
-new AlterConfigsRequest.Config(Collections.singleton(
-  new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, 
"100")
-))), true).build()
+Collections.singletonMap(new ConfigResource(ConfigResource.Type.TOPIC, 
tp.topic),
+  new AlterConfigsRequest.Config(Collections.singleton(
+new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, 
"100")
+  ))), true).build()

Review comment:
   Can we get rid of whitespace-only changes like this, or at least move 
them to another PR?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #9136: KAFKA-10211: Add DirectoryConfigProvider

2020-08-07 Thread GitBox


mimaison commented on a change in pull request #9136:
URL: https://github.com/apache/kafka/pull/9136#discussion_r467206725



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An implementation of {@link ConfigProvider} based on a directory of files.
+ * Property keys correspond to the names of the regular (i.e. non-directory)
+ * files in a directory given by the path parameter.
+ * Property values are taken from the file contents corresponding to each key.
+ */
+public class DirectoryConfigProvider implements ConfigProvider {
+
+private final Logger log = LoggerFactory.getLogger(getClass());
+
+@Override
+public void configure(Map configs) { }
+
+@Override
+public void close() throws IOException { }
+
+/**
+ * Retrieves the data contained in regular files in the directory given by 
{@code path}.
+ * Non-regular files (such as directories) in the given directory are 
silently ignored.
+ * @param path the directory where data files reside.
+ * @return the configuration data.
+ */
+@Override
+public ConfigData get(String path) {
+return get(path, File::isFile);
+}
+
+/**
+ * Retrieves the data contained in the regular files named by {@code keys} 
in the directory given by {@code path}.
+ * Non-regular files (such as directories) in the given directory are 
silently ignored.
+ * @param path the directory where data files reside.
+ * @param keys the keys whose values will be retrieved.
+ * @return the configuration data.
+ */
+@Override
+public ConfigData get(String path, Set keys) {
+return get(path, pathname ->
+pathname.isFile()
+&& keys.contains(pathname.getName()));
+}
+
+private ConfigData get(String path, FileFilter fileFilter) {
+Map map = new HashMap<>();
+if (path != null && !path.isEmpty()) {
+File dir = new File(path);
+if (!dir.isDirectory()) {
+log.warn("The path {} is not a directory", path);
+} else {
+for (File file : dir.listFiles(fileFilter)) {

Review comment:
   Should we use `java.nio.Files` here instead? That will also give us an 
Exception if an IO error happens instead of `null`.

##
File path: 
clients/src/main/java/org/apache/kafka/common/config/provider/DirectoryConfigProvider.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.config.provider;
+
+import org.apache.kafka.common.config.ConfigData;
+import org.apache.kafka.common.config.ConfigException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import 

[jira] [Commented] (KAFKA-10223) ReplicaNotAvailableException must be retriable to handle reassignments

2020-08-07 Thread Dongjoon Hyun (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173403#comment-17173403
 ] 

Dongjoon Hyun commented on KAFKA-10223:
---

Hi, [~rsivaram]. Is this only 2.6.0 issue?

> ReplicaNotAvailableException must be retriable to handle reassignments
> --
>
> Key: KAFKA-10223
> URL: https://issues.apache.org/jira/browse/KAFKA-10223
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Critical
> Fix For: 2.6.0
>
>
> ReplicaNotAvailableException should be a retriable `InvalidMetadataException` 
> since consumers may throw this during reassignments.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-08-07 Thread GitBox


mimaison commented on pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#issuecomment-670643947


   @abbccdda I've addressed your comments, can you take another look?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-08-07 Thread GitBox


mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r467194925



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -1221,28 +1226,39 @@ private DeleteGroupsResponse 
createDeleteGroupsResponse() {
 
 private ListOffsetRequest createListOffsetRequest(int version) {
 if (version == 0) {
-Map offsetData = 
Collections.singletonMap(
-new TopicPartition("test", 0),
-new ListOffsetRequest.PartitionData(100L, 10));
+ListOffsetTopic topic = new ListOffsetTopic()

Review comment:
   Not entirely sure. It's only used 3 times so we're not going to save 
very much.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on a change in pull request #8295: KAFKA-9627: Replace ListOffset request/response with automated protocol

2020-08-07 Thread GitBox


mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r467193718



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -1251,15 +1267,28 @@ private ListOffsetRequest createListOffsetRequest(int 
version) {
 
 private ListOffsetResponse createListOffsetResponse(int version) {
 if (version == 0) {
-Map responseData 
= new HashMap<>();
-responseData.put(new TopicPartition("test", 0),
-new ListOffsetResponse.PartitionData(Errors.NONE, 
asList(100L)));
-return new ListOffsetResponse(responseData);
+ListOffsetResponseData data = new ListOffsetResponseData()

Review comment:
   I'm not sure about adding extra methods to `ListOffsetResponse` just to 
remove a few lines in tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-07 Thread GitBox


rondagostino commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r467191680



##
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##
@@ -508,7 +563,15 @@ object ConfigCommand extends Config {
 
   val entityStr = (entitySubstr(ClientQuotaEntity.USER) ++ 
entitySubstr(ClientQuotaEntity.CLIENT_ID)).mkString(", ")
   val entriesStr = entries.asScala.map(e => 
s"${e._1}=${e._2}").mkString(", ")
-  println(s"Configs for ${entityStr} are ${entriesStr}")
+  println(s"Quota configs for ${entityStr} are ${entriesStr}")
+}
+// we describe user SCRAM credentials only when we are not describing 
client information
+// and we are not given either --entity-default or --user-defaults
+if (!entityTypes.contains(ConfigType.Client) && !entityNames.contains("")) 
{
+  getUserScramCredentialConfigs(adminClient, entityNames).foreach { case 
(user, description) =>

Review comment:
   I'm adding 
`core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala` -- 
let me know if this test covers this case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9110: MINOR: Ensure a reason is logged for all segment deletion operations

2020-08-07 Thread GitBox


hachikuji merged pull request #9110:
URL: https://github.com/apache/kafka/pull/9110


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on a change in pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded

2020-08-07 Thread GitBox


lbradstreet commented on a change in pull request #7222:
URL: https://github.com/apache/kafka/pull/7222#discussion_r467187874



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##
@@ -422,10 +421,13 @@ synchronized int numAssignedPartitions() {
 }
 
 synchronized List 
fetchablePartitions(Predicate isAvailable) {
-return assignment.stream()
-.filter(tpState -> isAvailable.test(tpState.topicPartition()) 
&& tpState.value().isFetchable())
-.map(PartitionStates.PartitionState::topicPartition)
-.collect(Collectors.toList());
+List result = new ArrayList<>();

Review comment:
   We should add a small comment that this is in the hotpath and is written 
the "ugly" way for a reason. It's also probably worth mentioning that we do the 
cheap isFetchable check first.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet closed pull request #9123: MINOR: optimize fetchablePartitions check by performing cheap check first

2020-08-07 Thread GitBox


lbradstreet closed pull request #9123:
URL: https://github.com/apache/kafka/pull/9123


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded

2020-08-07 Thread GitBox


hachikuji commented on a change in pull request #7222:
URL: https://github.com/apache/kafka/pull/7222#discussion_r467143222



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -1120,17 +1123,28 @@ Node selectReadReplica(TopicPartition partition, Node 
leaderReplica, long curren
 }
 }
 
+/**
+ * If we have seen new metadata (as tracked by {@link 
org.apache.kafka.clients.Metadata#updateVersion()}), then
+ * we should check that all of the assignments have a valid position.
+ */
+private void maybeValidateAssignments() {

Review comment:
   The name here threw me off because of its generality. I think we need to 
work metadata into the name to make it clearer. Maybe 
`validatePositionsOnMetadataChange` or `validatePositionsForLeaderChanges`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 opened a new pull request #9141: MINOR: Improve checks for CogroupedStreamAggregateBuilder

2020-08-07 Thread GitBox


lct45 opened a new pull request #9141:
URL: https://github.com/apache/kafka/pull/9141


   Updated `CogroupedStreamAggregateBuilder` to have individual builders 
depending on the windowed aggregation, or lack thereof. This replaced passing 
in all options into the builder, with all but the current type of aggregation 
set to null and then checking to see which value was not null.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9131: KAFKA-10367: Allow running the Streams demo app with a config file

2020-08-07 Thread GitBox


mimaison commented on pull request #9131:
URL: https://github.com/apache/kafka/pull/9131#issuecomment-670634953


   Thanks @mjsax, I wasn't expecting a review this quickly! Sorry for not fully 
checking the changes before opening the PR.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #8878: MINOR: Generator config-specific HTML ids

2020-08-07 Thread GitBox


omkreddy commented on pull request #8878:
URL: https://github.com/apache/kafka/pull/8878#issuecomment-670626828


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8976: KIP-617: Allow Kafka Streams State Stores to be iterated backwards

2020-08-07 Thread GitBox


ableegoldman commented on pull request #8976:
URL: https://github.com/apache/kafka/pull/8976#issuecomment-670624337


   Awesome, thanks @jeqo ! Let me know when the first PR is ready for review 
again (or is it ready now?)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy closed pull request #9050: KAFKA-10193: Add preemption for controller events that have callbacks

2020-08-07 Thread GitBox


omkreddy closed pull request #9050:
URL: https://github.com/apache/kafka/pull/9050


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9140: KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads

2020-08-07 Thread GitBox


dajac commented on pull request #9140:
URL: https://github.com/apache/kafka/pull/9140#issuecomment-670607229


   Jira: https://issues.apache.org/jira/browse/KAFKA-10374



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10374) Add concurrency tests for the ReplicaManager

2020-08-07 Thread David Jacot (Jira)
David Jacot created KAFKA-10374:
---

 Summary: Add concurrency tests for the ReplicaManager
 Key: KAFKA-10374
 URL: https://issues.apache.org/jira/browse/KAFKA-10374
 Project: Kafka
  Issue Type: Improvement
Reporter: David Jacot


We recently discovered a regression in the ReplicaManager that was due to some 
concurrency issue: KAFKA-10371.

We should add concurrency tests for this area of the broker to ensure that we 
catch similar issues in the future.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #9140: KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads

2020-08-07 Thread GitBox


hachikuji commented on pull request #9140:
URL: https://github.com/apache/kafka/pull/9140#issuecomment-670606960


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10370) WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) when (tp, offsets) are supplied by WorkerSinkTaskContext

2020-08-07 Thread Ning Zhang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10370?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ning Zhang updated KAFKA-10370:
---
Description: 
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution that has been initially verified is to use *consumer.assign* 
with *consumer.seek* , instead of *consumer.subscribe*. 

  was:
In WorkerSinkTask.java, when we want the consumer to start consuming from 
certain offsets, rather than from the last committed offset, 
[WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
 is used to carry the offsets from external world (e.g. implementation of 
SinkTask).

In the [poll() 
method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
 it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
(2) consumer.seek(tp, offset) to rewind the consumer.

when running (2), we saw the following IllegalStateException:

{code:java}
java.lang.IllegalStateException: No current assignment for partition mytopic-1
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
at 
org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
{code}

As suggested in 
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
 the resolution is to use *consumer.assign* with *consumer.seek* , instead of 
*consumer.subscribe*


> WorkerSinkTask: IllegalStateException cased by consumer.seek(tp, offsets) 
> when (tp, offsets) are supplied by WorkerSinkTaskContext
> --
>
> Key: KAFKA-10370
> URL: https://issues.apache.org/jira/browse/KAFKA-10370
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Ning Zhang
>Assignee: Ning Zhang
>Priority: Major
> Fix For: 2.6.0
>
>
> In WorkerSinkTask.java, when we want the consumer to start consuming from 
> certain offsets, rather than from the last committed offset, 
> [WorkerSinkTaskContext|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L295]
>  is used to carry the offsets from external world (e.g. implementation of 
> SinkTask).
> In the [poll() 
> method|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L312],
>  it first call "rewind()" to (1) read the offsets from WorkerSinkTaskContext, 
> (2) consumer.seek(tp, offset) to rewind the consumer.
> when running (2), we saw the following IllegalStateException:
> {code:java}
> java.lang.IllegalStateException: No current assignment for partition mytopic-1
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:251)
> at 
> org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:276)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1135)
> {code}
> As suggested in 
> https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition/41010594,
>  the resolution 

[GitHub] [kafka] jeqo closed pull request #8976: KIP-617: Allow Kafka Streams State Stores to be iterated backwards

2020-08-07 Thread GitBox


jeqo closed pull request #8976:
URL: https://github.com/apache/kafka/pull/8976


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on pull request #8976: KIP-617: Allow Kafka Streams State Stores to be iterated backwards

2020-08-07 Thread GitBox


jeqo commented on pull request #8976:
URL: https://github.com/apache/kafka/pull/8976#issuecomment-670601500


   Closing in favor of #9137, #9138 and #9139 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo commented on pull request #8976: KIP-617: Allow Kafka Streams State Stores to be iterated backwards

2020-08-07 Thread GitBox


jeqo commented on pull request #8976:
URL: https://github.com/apache/kafka/pull/8976#issuecomment-670601289


   @ableegoldman great feedback. It makes sense to group changes in 3 different 
PRs. As there is dependencies between stores, let's start with KeyValue, then 
Window and finally Session store. 
   
   I have created #9137, #9138, #9139 to be tackled in that order.
   
   PS. I'm considering all the comments you mentioned here.
   
   Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #9140: KAFKA-10371; Partition reassignments can result in crashed ReplicaFetcherThreads

2020-08-07 Thread GitBox


dajac opened a new pull request #9140:
URL: https://github.com/apache/kafka/pull/9140


   https://github.com/apache/kafka/pull/8672 introduced a bug leading to 
crashing the replica fetcher threads. The issue is that 
https://github.com/apache/kafka/pull/8672 deletes the Partitions prior to 
stopping the replica fetchers. As the replica fetchers relies access the 
Partition in the ReplicaManager, they crash with a NotLeaderOrFollowerException 
that is not handled.
   
   This PR reverts the code to the original ordering to avoid this issue.
   
   The regression has been caught by our system test: 
`kafkatest.tests.core.reassign_partitions_test`.
   
   I have not managed to reproduce the issue in a unit test without 
reimplementing the entire system test in Java. I am not sure that makes sense 
as we already have it in Python.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #7222: KAFKA-8806 Reduce calls to validateOffsetsIfNeeded

2020-08-07 Thread GitBox


hachikuji commented on a change in pull request #7222:
URL: https://github.com/apache/kafka/pull/7222#discussion_r467139256



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
##
@@ -406,6 +407,10 @@ synchronized void maybeSeekUnvalidated(TopicPartition tp, 
long offset, OffsetRes
 return new HashSet<>(this.assignment.partitionSet());
 }
 
+public synchronized void forEachAssignedPartition(Consumer 
consumer) {

Review comment:
   This is unused

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -1142,6 +1156,7 @@ Node selectReadReplica(TopicPartition partition, Node 
leaderReplica, long curren
 
 Optional leaderOpt = position.currentLeader.leader;
 if (!leaderOpt.isPresent()) {
+log.info("Requesting metadata update for partition {} since 
the position {} is missing the current leader node", partition, position);

Review comment:
   This feels more like a debug level log to me. Was there a specific 
reason we needed it?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
##
@@ -1120,17 +1123,28 @@ Node selectReadReplica(TopicPartition partition, Node 
leaderReplica, long curren
 }
 }
 
+/**
+ * If we have seen new metadata (as tracked by {@link 
org.apache.kafka.clients.Metadata#updateVersion()}), then
+ * we should check that all of the assignments have a valid position.
+ */
+private void maybeValidateAssignments() {

Review comment:
   The name here through me off because of its generality. I think we need 
to work metadata into the name to make it clearer. Maybe 
`validatePositionsOnMetadataChange` or `validatePositionsForLeaderChanges`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo opened a new pull request #9139: KAFKA-9929: Support backward iterator on SessionStore

2020-08-07 Thread GitBox


jeqo opened a new pull request #9139:
URL: https://github.com/apache/kafka/pull/9139


   Depends on https://github.com/apache/kafka/pull/9138
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo opened a new pull request #9138: Backward windowstore

2020-08-07 Thread GitBox


jeqo opened a new pull request #9138:
URL: https://github.com/apache/kafka/pull/9138


   Depends on https://github.com/apache/kafka/pull/9137
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jeqo opened a new pull request #9137: KAFKA-9929: Support reverse iterator on KeyValueStore

2020-08-07 Thread GitBox


jeqo opened a new pull request #9137:
URL: https://github.com/apache/kafka/pull/9137


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10329) Enable connector context in logs by default

2020-08-07 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173179#comment-17173179
 ] 

Dongjin Lee commented on KAFKA-10329:
-

Hi [~rhauch],

It seems like this issue is related to [KIP-653: Upgrade log4j to 
log4j2|https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+Upgrade+log4j+to+log4j2].
 Could you have a look? In my humble opinion, it would be good to handle both 
issues at once since upgrading to log4j2 inevitably introduces some backward 
compatibility break.

> Enable connector context in logs by default
> ---
>
> Key: KAFKA-10329
> URL: https://issues.apache.org/jira/browse/KAFKA-10329
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 3.0.0
>Reporter: Randall Hauch
>Priority: Blocker
>  Labels: needs-kip
> Fix For: 3.0.0
>
>
> When 
> [KIP-449|https://cwiki.apache.org/confluence/display/KAFKA/KIP-449%3A+Add+connector+contexts+to+Connect+worker+logs]
>  was implemented and released as part of AK 2.3, we chose to not enable these 
> extra logging context information by default because it was not backward 
> compatible, and anyone relying upon the `connect-log4j.properties` file 
> provided by the AK distribution would after an upgrade to AK 2.3 (or later) 
> see different formats for their logs, which could break any log processing 
> functionality they were relying upon.
> However, we should enable this in AK 3.0, whenever that comes. Doing so will 
> require a fairly minor KIP to change the `connect-log4j.properties` file 
> slightly.
> Marked this as BLOCKER since it's a backward incompatible change that we 
> definitely want to do in the 3.0.0 release.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10352) Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)

2020-08-07 Thread Dongjin Lee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17173151#comment-17173151
 ] 

Dongjin Lee commented on KAFKA-10352:
-

Hi [~dorbae],

It seems like you are using `/tmp/kafka-logs` as a storage directory. right? 
Then, your OS may delete the directory for garbage collecting the `/tmp` 
directory - I think it should the cause of the exception.

I strongly recommend not to use the `/tmp` as a storage directory and assign a 
dedicated device for storage.

> Error while reading checkpoint file /tmp/kafka-logs/cleaner-offset-checkpoint 
> (kafka.server.LogDirFailureChannel)
> -
>
> Key: KAFKA-10352
> URL: https://issues.apache.org/jira/browse/KAFKA-10352
> Project: Kafka
>  Issue Type: Bug
>  Components: log cleaner
>Reporter: Seongbae Chang
>Priority: Critical
>
> One of my Kafka brokers(total 3, and version 2.5.0) was shut down suddenly. 
> And then, other brokers also was shut down because of similar causes.
>  
> Main cause of this problem is '*Error while reading checkpoint file 
> /tmp/kafka-logs/cleaner-offset-checkpoint (kafka.server.LogDirFailureChannel)*
> *java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/cleaner-offset-checkpoint*'
>  
> I haven't known why this error occurs and how to solve it. Please, give me 
> some answers or comments about it. Thank you.
> And I attached the content of log files such as kafkaServer.out, 
> log-cleaner.log
>  
> kafkaServer.out
> {code:java}
> [2020-07-30 19:49:05,992] INFO [GroupMetadataManager brokerId=3] Removed 0 
> expired offsets in 0 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 19:49:05,992] INFO 
> [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 
> milliseconds. (kafka.coordinator.group.GroupMetadataManager)[2020-07-30 
> 19:56:48,080] ERROR Error while reading checkpoint file 
> /tmp/kafka-logs/cleaner-offset-checkpoint 
> (kafka.server.LogDirFailureChannel)java.nio.file.NoSuchFileException: 
> /tmp/kafka-logs/cleaner-offset-checkpoint at 
> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86) at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102) at 
> sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107) at 
> sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
>  at java.nio.file.Files.newByteChannel(Files.java:361) at 
> java.nio.file.Files.newByteChannel(Files.java:407) at 
> java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
>  at java.nio.file.Files.newInputStream(Files.java:152) at 
> java.nio.file.Files.newBufferedReader(Files.java:2784) at 
> java.nio.file.Files.newBufferedReader(Files.java:2816) at 
> kafka.server.checkpoints.CheckpointFile.liftedTree2$1(CheckpointFile.scala:87)
>  at kafka.server.checkpoints.CheckpointFile.read(CheckpointFile.scala:86) at 
> kafka.server.checkpoints.OffsetCheckpointFile.read(OffsetCheckpointFile.scala:61)
>  at 
> kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$2(LogCleanerManager.scala:134)
>  at scala.collection.Iterator$$anon$10.nextCur(Iterator.scala:583) at 
> scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:597) at 
> scala.collection.mutable.ListBuffer.addAll(ListBuffer.scala:118) at 
> scala.collection.mutable.ListBuffer$.from(ListBuffer.scala:38) at 
> scala.collection.immutable.List$.from(List.scala:617) at 
> scala.collection.immutable.List$.from(List.scala:611) at 
> scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at 
> scala.collection.immutable.Iterable$.from(Iterable.scala:35) at 
> scala.collection.immutable.Iterable$.from(Iterable.scala:32) at 
> scala.collection.IterableFactory$Delegate.from(Factory.scala:288) at 
> scala.collection.IterableOps.flatMap(Iterable.scala:674) at 
> scala.collection.IterableOps.flatMap$(Iterable.scala:674) at 
> scala.collection.AbstractIterable.flatMap(Iterable.scala:921) at 
> kafka.log.LogCleanerManager.$anonfun$allCleanerCheckpoints$1(LogCleanerManager.scala:132)
>  at 
> kafka.log.LogCleanerManager.allCleanerCheckpoints(LogCleanerManager.scala:140)
>  at 
> kafka.log.LogCleanerManager.$anonfun$grabFilthiestCompactedLog$1(LogCleanerManager.scala:171)
>  at 
> kafka.log.LogCleanerManager.grabFilthiestCompactedLog(LogCleanerManager.scala:168)
>  at 
> kafka.log.LogCleaner$CleanerThread.cleanFilthiestLog(LogCleaner.scala:327) at 
> kafka.log.LogCleaner$CleanerThread.tryCleanFilthiestLog(LogCleaner.scala:314) 
> at kafka.log.LogCleaner$CleanerThread.doWork(LogCleaner.scala:303) at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)[2020-07-30 
> 19:56:48,083] WARN [ReplicaManager broker=3] Stopping serving replicas in dir 
> 

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9032: KAFKA-10259: KIP-554 Broker-side SCRAM Config API

2020-08-07 Thread GitBox


rajinisivaram commented on a change in pull request #9032:
URL: https://github.com/apache/kafka/pull/9032#discussion_r467052269



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4071,6 +4081,168 @@ void handleFailure(Throwable throwable) {
 return new 
AlterClientQuotasResult(Collections.unmodifiableMap(futures));
 }
 
+@Override
+public DescribeUserScramCredentialsResult 
describeUserScramCredentials(List users, 
DescribeUserScramCredentialsOptions options) {
+final KafkaFutureImpl> 
future = new KafkaFutureImpl<>();
+final long now = time.milliseconds();
+Call call = new Call("describeUserScramCredentials", 
calcDeadlineMs(now, options.timeoutMs()),
+new ControllerNodeProvider()) {

Review comment:
   Let's see what @cmccabe thinks. We can do whatever we intend to do for 
similar describe APIs in the KIP-500 world. If we are changing, then we should 
update the KIP and send a note to the KIP discussion thread.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10373) Kafka Reassign Partition is stuck with Java OutOfMemory error

2020-08-07 Thread azher khan (Jira)
azher khan created KAFKA-10373:
--

 Summary: Kafka Reassign Partition is stuck with Java OutOfMemory 
error
 Key: KAFKA-10373
 URL: https://issues.apache.org/jira/browse/KAFKA-10373
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.2.1
Reporter: azher khan


Hi Team,

While trying to run the Kafka script to reassign partitions of an existing 
topic, we are seeing a Java OutOfMemory issue.

 

The heap for Kafka is set to "-Xmx1G -Xms1G" on the kafka broker.

 
{code:java}
/opt/kafka/bin/kafka-reassign-partitions.sh --zookeeper zookeeper1:2181 
--reassignment-json-file topic_kafka_topic1_reassignment.json 
--bootstrap-server kafkabroker1:9092 --verify
Status of partition reassignment:
[2020-08-07 XX:XX:XX,] ERROR Uncaught exception in thread 
'kafka-admin-client-thread | reassign-partitions-tool': 
(org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
 at java.nio.HeapByteBuffer.(HeapByteBuffer.java:57)
 at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
 at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
 at 
org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
 at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335)
 at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296)
 at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560)
 at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496)
 at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
 at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
 at 
org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1116)
 at java.lang.Thread.run(Thread.java:748)
Reassignment of partition kafka_topic1-0 is still in progress
Reassignment of partition kafka_topic1-1 is still in progress
Reassignment of partition kafka_topic1-2 is still in progress{code}
 

Retried the above command after removing the "reassign_partitions" from 
zookeeper as suggested but we are seeing the same error.

 

 
{code:java}
[zk: localhost:2181(CONNECTED) 5] delete /admin/reassign_partitions
[zk: localhost:2181(CONNECTED) 7] ls /admin
[delete_topics] 
{code}
 

Would highly appreciate your advice,

Thank you in advance,

 

Regards,

Azher Khan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps

2020-08-07 Thread Kevin Tibi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Tibi resolved KAFKA-10372.

Resolution: Abandoned

error from ansible role which override script.

> [JAVA 11] Unrecognized VM option PrintGCDateStamps
> --
>
> Key: KAFKA-10372
> URL: https://issues.apache.org/jira/browse/KAFKA-10372
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Kevin Tibi
>Priority: Blocker
>  Labels: bug
> Fix For: 2.6.1
>
>
> Hello,
> I can't start kafka with JAVA 11. 
>  
> {code:java}
> kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps'
> kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine.
> kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program 
> will exit.{code}
> This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11.
> Solution :
> {code:java}
> -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and 
> PrintGCDateStamps{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9366) Upgrade log4j to log4j2

2020-08-07 Thread Dongjin Lee (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjin Lee updated KAFKA-9366:
---
Summary: Upgrade log4j to log4j2  (was: please consider upgrade log4j to 
log4j2 due to critical security problem CVE-2019-17571)

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps

2020-08-07 Thread Kevin Tibi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Tibi updated KAFKA-10372:
---
Priority: Blocker  (was: Major)

> [JAVA 11] Unrecognized VM option PrintGCDateStamps
> --
>
> Key: KAFKA-10372
> URL: https://issues.apache.org/jira/browse/KAFKA-10372
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Kevin Tibi
>Priority: Blocker
>  Labels: bug
> Fix For: 2.6.1
>
>
> Hello,
> I can start kafka with JAVA 11. 
>  
> {code:java}
> kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps'
> kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine.
> kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program 
> will exit.{code}
> This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11.
> Solution :
> {code:java}
> -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and 
> PrintGCDateStamps{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps

2020-08-07 Thread Kevin Tibi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Tibi updated KAFKA-10372:
---
Description: 
Hello,

I can't start kafka with JAVA 11. 

 
{code:java}
kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps'
kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine.
kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program 
will exit.{code}
This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11.

Solution :
{code:java}
-Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and 
PrintGCDateStamps{code}

  was:
Hello,

I can start kafka with JAVA 11. 

 
{code:java}
kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps'
kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine.
kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program 
will exit.{code}
This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11.

Solution :
{code:java}
-Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and 
PrintGCDateStamps{code}


> [JAVA 11] Unrecognized VM option PrintGCDateStamps
> --
>
> Key: KAFKA-10372
> URL: https://issues.apache.org/jira/browse/KAFKA-10372
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Kevin Tibi
>Priority: Blocker
>  Labels: bug
> Fix For: 2.6.1
>
>
> Hello,
> I can't start kafka with JAVA 11. 
>  
> {code:java}
> kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps'
> kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine.
> kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program 
> will exit.{code}
> This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11.
> Solution :
> {code:java}
> -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and 
> PrintGCDateStamps{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps

2020-08-07 Thread Kevin Tibi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Tibi updated KAFKA-10372:
---
Fix Version/s: 2.6.1

> [JAVA 11] Unrecognized VM option PrintGCDateStamps
> --
>
> Key: KAFKA-10372
> URL: https://issues.apache.org/jira/browse/KAFKA-10372
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Kevin Tibi
>Priority: Major
>  Labels: bug
> Fix For: 2.6.1
>
>
> Hello,
> I can start kafka with JAVA 11. 
>  
> {code:java}
> kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps'
> kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine.
> kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program 
> will exit.{code}
> This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11.
> Solution :
> {code:java}
> -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and 
> PrintGCDateStamps{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps

2020-08-07 Thread Kevin Tibi (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kevin Tibi updated KAFKA-10372:
---
Labels: bug  (was: )

> [JAVA 11] Unrecognized VM option PrintGCDateStamps
> --
>
> Key: KAFKA-10372
> URL: https://issues.apache.org/jira/browse/KAFKA-10372
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.6.0
>Reporter: Kevin Tibi
>Priority: Major
>  Labels: bug
>
> Hello,
> I can start kafka with JAVA 11. 
>  
> {code:java}
> kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps'
> kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine.
> kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program 
> will exit.{code}
> This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11.
> Solution :
> {code:java}
> -Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and 
> PrintGCDateStamps{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10372) [JAVA 11] Unrecognized VM option PrintGCDateStamps

2020-08-07 Thread Kevin Tibi (Jira)
Kevin Tibi created KAFKA-10372:
--

 Summary: [JAVA 11] Unrecognized VM option PrintGCDateStamps
 Key: KAFKA-10372
 URL: https://issues.apache.org/jira/browse/KAFKA-10372
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.6.0
Reporter: Kevin Tibi


Hello,

I can start kafka with JAVA 11. 

 
{code:java}
kafka-server-start.sh[2721]: Unrecognized VM option 'PrintGCDateStamps'
kafka-server-start.sh[2721]: Error: Could not create the Java Virtual Machine.
kafka-server-start.sh[2721]: Error: A fatal exception has occurred. Program 
will exit.{code}
This flag (in kafka-run-class.sh L302) is deprecated in JAVA 11.

Solution :
{code:java}
-Xlog:::time,uptime,level,tags" # Equivalent to PrintGCTimeStamps, Uptime and 
PrintGCDateStamps{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #9136: KAFKA-10211: Add DirectoryConfigProvider

2020-08-07 Thread GitBox


tombentley commented on pull request #9136:
URL: https://github.com/apache/kafka/pull/9136#issuecomment-670438137


   @omkreddy, @mimaison, @rajinisivaram, @kkonstantine, @dajac thanks for 
voting on the KIP. I'd be grateful for review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley opened a new pull request #9136: KAFKA-10211: Add DirectoryConfigProvider

2020-08-07 Thread GitBox


tombentley opened a new pull request #9136:
URL: https://github.com/apache/kafka/pull/9136


   See KIP-632.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8878: MINOR: Generator config-specific HTML ids

2020-08-07 Thread GitBox


tombentley commented on pull request #8878:
URL: https://github.com/apache/kafka/pull/8878#issuecomment-670404348


   Thinking about, it we could avoid breaking existing links by generating both 
old and new style ids: `whatever.config.name`. WDYT @mimaison @omkreddy?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tom1299 opened a new pull request #9135: MINOR [WIP] Refactor Producer class in examples

2020-08-07 Thread GitBox


tom1299 opened a new pull request #9135:
URL: https://github.com/apache/kafka/pull/9135


   I'm a newbie to the Kafka project and going through the examples. Along the 
way I did a very small refactoring to the Producer class:
   * Convert the Callback class into a method
   
   The aim was to simplify the whole class and introduce lambda-style / 
functional style programming now available 
   in current java versions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9134: MINOR: set initial capacity for all array buffers in converting produ…

2020-08-07 Thread GitBox


chia7712 opened a new pull request #9134:
URL: https://github.com/apache/kafka/pull/9134


   this is a bit optimization of converting producer responses. Those array 
buffers need initial capacity to avoid growing continually if the request 
carries a bunch of small messages and those messages are sent to different 
partitions.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10371) Partition reassignments can result in crashed ReplicaFetcherThreads.

2020-08-07 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10371?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-10371:
---

Assignee: David Jacot

> Partition reassignments can result in crashed ReplicaFetcherThreads.
> 
>
> Key: KAFKA-10371
> URL: https://issues.apache.org/jira/browse/KAFKA-10371
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.7.0
>Reporter: Steve Rodrigues
>Assignee: David Jacot
>Priority: Critical
>
> A Kafka system doing partition reassignments got stuck with the reassignment 
> partially done and the system with a non-zero number of URPs and increasing 
> max lag.
> Looking in the logs, we see: 
> {noformat}
> [ERROR] 2020-07-31 21:22:23,984 [ReplicaFetcherThread-0-3] 
> kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, 
> fetcherId=0] Error due to
> org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while 
> fetching partition state for foo
> [INFO] 2020-07-31 21:22:23,986 [ReplicaFetcherThread-0-3] 
> kafka.server.ReplicaFetcherThread - [ReplicaFetcher replicaId=4, leaderId=3, 
> fetcherId=0] Stopped
> {noformat}
> Investigating further and with some helpful changes to the exception (which 
> was not generating a stack trace because it was a client-side exception), we 
> see on a test run:
> {noformat}
> [2020-08-06 19:58:21,592] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> org.apache.kafka.common.errors.NotLeaderOrFollowerException: Error while 
> fetching partition state for topic-test-topic-85
> at org.apache.kafka.common.protocol.Errors.exception(Errors.java:415)
> at 
> kafka.server.ReplicaManager.getPartitionOrException(ReplicaManager.scala:645)
> at 
> kafka.server.ReplicaManager.localLogOrException(ReplicaManager.scala:672)
> at 
> kafka.server.ReplicaFetcherThread.logStartOffset(ReplicaFetcherThread.scala:133)
> at 
> kafka.server.ReplicaFetcherThread.$anonfun$buildFetch$1(ReplicaFetcherThread.scala:316)
> at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:553)
> at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:551)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:920)
> at 
> kafka.server.ReplicaFetcherThread.buildFetch(ReplicaFetcherThread.scala:309)
> {noformat}
> It appears that the fetcher is attempting to fetch for a partition that has 
> been getting reassigned away. From further investigation, it seems that in 
> KAFKA-10002 the StopReplica code was changed from:
> 1. Remove partition from fetcher
> 2. Remove partition from partition map
> to the other way around, but now the fetcher may race and attempt to build a 
> fetch for a partition that's no longer mapped.  In particular, since the 
> logOrException code is being called from logStartOffset which isn't protected 
> against NotLeaderOrFollowerException, just against KafkaStorageException, the 
> exception isn't caught and throws all the way out, killing the replica 
> fetcher thread.
> We need to switch this back.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)