[jira] [Resolved] (KAFKA-7215) Improve LogCleaner behavior on error

2018-10-08 Thread Jun Rao (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jun Rao resolved KAFKA-7215.

   Resolution: Fixed
Fix Version/s: 2.1.0

Merged to 2.1 and trunk.

> Improve LogCleaner behavior on error
> 
>
> Key: KAFKA-7215
> URL: https://issues.apache.org/jira/browse/KAFKA-7215
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
> Fix For: 2.1.0
>
>
> For more detailed information see 
> [KIP-346|https://cwiki.apache.org/confluence/display/KAFKA/KIP-346+-+Improve+LogCleaner+behavior+on+error]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7492) Explain `null` handling for reduce and aggregate

2018-10-08 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-7492:
--

 Summary: Explain `null` handling for reduce and aggregate
 Key: KAFKA-7492
 URL: https://issues.apache.org/jira/browse/KAFKA-7492
 Project: Kafka
  Issue Type: Improvement
  Components: documentation, streams
Reporter: Matthias J. Sax


We currently don't explain how records with `null` value are handled in reduce 
and aggregate. In particular, what happens when the users' aggregation/reduce 
`apply()` implementation returns `null`.

We should update the JavaDocs accordingly and maybe also update the docs on the 
web page.

Cf. 
https://stackoverflow.com/questions/52692202/what-happens-if-the-aggregator-of-a-kgroupedstream-returns-null



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7192) State-store can desynchronise with changelog

2018-10-08 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7192:
---
Fix Version/s: 1.0.3

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7192) State-store can desynchronise with changelog

2018-10-08 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax updated KAFKA-7192:
---
Affects Version/s: 1.0.2
   2.0.0

> State-store can desynchronise with changelog
> 
>
> Key: KAFKA-7192
> URL: https://issues.apache.org/jira/browse/KAFKA-7192
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>Reporter: Jon Bates
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: bugs
> Fix For: 0.11.0.4, 1.0.3, 2.0.1, 2.1.0
>
>
> n.b. this bug has been verified with exactly-once processing enabled
> Consider the following scenario:
>  * A record, N is read into a Kafka topology
>  * the state store is updated
>  * the topology crashes
> h3. *Expected behaviour:*
>  # Node is restarted
>  # Offset was never updated, so record N is reprocessed
>  # State-store is reset to position N-1
>  # Record is reprocessed
> h3. *Actual Behaviour*
>  # Node is restarted
>  # Record N is reprocessed (good)
>  # The state store has the state from the previous processing
> I'd consider this a corruption of the state-store, hence the critical 
> Priority, although High may be more appropriate.
> I wrote a proof-of-concept here, which demonstrates the problem on Linux:
> [https://github.com/spadger/kafka-streams-sad-state-store]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7192) State-store can desynchronise with changelog

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7192?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642553#comment-16642553
 ] 

ASF GitHub Bot commented on KAFKA-7192:
---

mjsax closed pull request #5657: KAFKA-7192: Wipe out state store if EOS is 
turned on and checkpoint file does not exist
URL: https://github.com/apache/kafka/pull/5657
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 410212e1ff0..eeaed893482 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -198,4 +198,10 @@ public ThreadCache getCache() {
 public void initialized() {
 initialized = true;
 }
+
+@Override
+public void uninitialize() {
+initialized = false;
+}
+
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index baea4af62f1..fcf2f6b13b7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -107,7 +107,7 @@ public final String applicationId() {
 }
 
 @Override
-public final Set partitions() {
+public Set partitions() {
 return partitions;
 }
 
@@ -226,6 +226,9 @@ void registerStateStores() {
 }
 }
 
+void reinitializeStateStoresForPartitions(final TopicPartition partitions) 
{
+stateMgr.reinitializeStateStoresForPartitions(partitions, 
processorContext);
+}
 
 /**
  * @throws ProcessorStateException if there is an error while closing the 
state manager
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 0d9d04de5cc..cfce57588e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -50,7 +50,7 @@
 // IQ may access this map.
 private Map running = new ConcurrentHashMap<>();
 private Map runningByPartition = new HashMap<>();
-private Map restoringByPartition = new HashMap<>();
+private Map restoringByPartition = new 
HashMap<>();
 private int committed = 0;
 
 
@@ -122,7 +122,8 @@ void addNewTask(final Task task) {
 try {
 if (!entry.getValue().initializeStateStores()) {
 log.debug("Transitioning {} {} to restoring", 
taskTypeName, entry.getKey());
-addToRestoring(entry.getValue());
+// cast is safe, because StandbyTasks always returns 
`true` in `initializeStateStores()` above
+addToRestoring((StreamTask) entry.getValue());
 } else {
 transitionToRunning(entry.getValue(), readyPartitions);
 }
@@ -278,7 +279,7 @@ boolean maybeResumeSuspendedTask(final TaskId taskId, final 
Set
 return false;
 }
 
-private void addToRestoring(final Task task) {
+private void addToRestoring(final StreamTask task) {
 restoring.put(task.id(), task);
 for (TopicPartition topicPartition : task.partitions()) {
 restoringByPartition.put(topicPartition, task);
@@ -307,7 +308,7 @@ private void transitionToRunning(final Task task, final 
Set read
 }
 
 @Override
-public Task restoringTaskFor(final TopicPartition partition) {
+public StreamTask restoringTaskFor(final TopicPartition partition) {
 return restoringByPartition.get(partition);
 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 57bb3ac81a6..b5719b111f0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -53,4 +53,9 @@
  * Mark this contex as being initialized
  */
 void initialized();
+
+/**
+ * Mark this context as being uninitialized
+ */
+void unini

[jira] [Resolved] (KAFKA-7478) Reduce OAuthBearerLoginModule verbosity

2018-10-08 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7478?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7478.
---
   Resolution: Fixed
Fix Version/s: 2.2.0

> Reduce OAuthBearerLoginModule verbosity
> ---
>
> Key: KAFKA-7478
> URL: https://issues.apache.org/jira/browse/KAFKA-7478
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
> Fix For: 2.2.0
>
>
> The OAuthBearerLoginModule is pretty verbose by default and this fills logs 
> in with too much information. It would be nice if we could reduce the 
> verbosity by default and let the user opt in to inspect these debug-friendly 
> messages
> {code:java}
> [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule login - 
> Login succeeded; invoke commit() to commit it; current committed token 
> count=0 
> [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule commit - 
> Committing my token; current committed token count = 0 
> [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] 
> org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule commit - 
> Done committing my token; committed token count is now 1
> [INFO] 2018-10-03 16:58:11,986 [qtp1137078855-1798] 
> org.apache.kafka.common.security.oauthbearer.internals.expiring.ExpiringCredentialRefreshingLogin
>  login - Successfully logged in.
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7478) Reduce OAuthBearerLoginModule verbosity

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642504#comment-16642504
 ] 

ASF GitHub Bot commented on KAFKA-7478:
---

rajinisivaram closed pull request #5738: KAFKA-7478: Reduce default logging 
verbosity in OAuthBearerLoginModule
URL: https://github.com/apache/kafka/pull/5738
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
index e3a78103560..e7976b55506 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
@@ -305,7 +305,7 @@ public boolean login() throws LoginException {
 log.debug("Logged in without a token, this login cannot be used to 
establish client connections");
 
 loginState = LoginState.LOGGED_IN_NOT_COMMITTED;
-log.info("Login succeeded; invoke commit() to commit it; current 
committed token count={}",
+log.debug("Login succeeded; invoke commit() to commit it; current 
committed token count={}",
 committedTokenCount());
 return true;
 }
@@ -340,7 +340,7 @@ private void identifyExtensions() throws LoginException {
 throw new LoginException("An internal error occurred while 
retrieving SASL extensions from callback handler");
 } catch (UnsupportedCallbackException e) {
 extensionsRequiringCommit = EMPTY_EXTENSIONS;
-log.info("CallbackHandler {} does not support SASL extensions. No 
extensions will be added", callbackHandler.getClass().getName());
+log.debug("CallbackHandler {} does not support SASL extensions. No 
extensions will be added", callbackHandler.getClass().getName());
 }
 if (extensionsRequiringCommit ==  null) {
 log.error("SASL Extensions cannot be null. Check whether your 
callback handler is explicitly setting them as null.");
@@ -354,12 +354,11 @@ public boolean logout() {
 throw new IllegalStateException(
 "Cannot call logout() immediately after login(); need to 
first invoke commit() or abort()");
 if (loginState != LoginState.COMMITTED) {
-if (log.isDebugEnabled())
-log.debug("Nothing here to log out");
+log.debug("Nothing here to log out");
 return false;
 }
 if (myCommittedToken != null) {
-log.info("Logging out my token; current committed token count = 
{}", committedTokenCount());
+log.trace("Logging out my token; current committed token count = 
{}", committedTokenCount());
 for (Iterator iterator = 
subject.getPrivateCredentials().iterator(); iterator.hasNext(); ) {
 Object privateCredential = iterator.next();
 if (privateCredential == myCommittedToken) {
@@ -368,15 +367,15 @@ public boolean logout() {
 break;
 }
 }
-log.info("Done logging out my token; committed token count is now 
{}", committedTokenCount());
+log.debug("Done logging out my token; committed token count is now 
{}", committedTokenCount());
 } else
 log.debug("No tokens to logout for this login");
 
 if (myCommittedExtensions != null) {
-log.info("Logging out my extensions");
+log.trace("Logging out my extensions");
 if (subject.getPublicCredentials().removeIf(e -> 
myCommittedExtensions == e))
 myCommittedExtensions = null;
-log.info("Done logging out my extensions");
+log.debug("Done logging out my extensions");
 } else
 log.debug("No extensions to logout for this login");
 
@@ -387,17 +386,16 @@ public boolean logout() {
 @Override
 public boolean commit() {
 if (loginState != LoginState.LOGGED_IN_NOT_COMMITTED) {
-if (log.isDebugEnabled())
-log.debug("Nothing here to commit");
+log.debug("Nothing here to commit");
 return false;
 }
 
 if (tokenRequiringCommit != null) {
-log.info("Committing my token; current committed token count = 
{}", committedTokenCount());
+log.trace("Committing my token; current committed token count = 
{}", committedTokenCount());
 subject.getPrivateCredentials().add(tokenRequiringCommit);
 myCommittedToken = tokenReq

[jira] [Commented] (KAFKA-7215) Improve LogCleaner behavior on error

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642395#comment-16642395
 ] 

ASF GitHub Bot commented on KAFKA-7215:
---

junrao closed pull request #5439: KAFKA-7215: Improve LogCleaner Error Handling
URL: https://github.com/apache/kafka/pull/5439
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala 
b/core/src/main/scala/kafka/log/Log.scala
index 8915c14b364..094473a8e26 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -193,7 +193,7 @@ class Log(@volatile var dir: File,
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
-  // The memory mapped buffer for index files of this log will be closed for 
index files of this log will be closed with either delete() or closeHandlers()
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
   // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
   @volatile private var isMemoryMappedBufferClosed = false
 
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index bf4f7e1fcba..0416325a4b8 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.Time
 
 import scala.collection.JavaConverters._
 import scala.collection.{Iterable, Set, mutable}
+import scala.util.control.ControlThrowable
 
 /**
  * The cleaner is responsible for removing obsolete records from logs which 
have the "compact" retention strategy.
@@ -293,49 +294,75 @@ class LogCleaner(initialConfig: CleanerConfig,
 
 /**
  * The main loop for the cleaner thread
+ * Clean a log if there is a dirty log available, otherwise sleep for a bit
  */
 override def doWork() {
-  cleanOrSleep()
+  val cleaned = cleanFilthiestLog()
+  if (!cleaned)
+pause(config.backOffMs, TimeUnit.MILLISECONDS)
 }
 
 /**
- * Clean a log if there is a dirty log available, otherwise sleep for a bit
- */
-private def cleanOrSleep() {
-  val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
-case None =>
-  false
-case Some(cleanable) =>
-  // there's a log, clean it
-  var endOffset = cleanable.firstDirtyOffset
-  try {
-val (nextDirtyOffset, cleanerStats) = cleaner.clean(cleanable)
-recordStats(cleaner.id, cleanable.log.name, 
cleanable.firstDirtyOffset, endOffset, cleanerStats)
-endOffset = nextDirtyOffset
-  } catch {
-case _: LogCleaningAbortedException => // task can be aborted, let 
it go.
-case _: KafkaStorageException => // partition is already offline. 
let it go.
-case e: IOException =>
-  val msg = s"Failed to clean up log for 
${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to 
IOException"
-  
logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e)
-  } finally {
-cleanerManager.doneCleaning(cleanable.topicPartition, 
cleanable.log.dir.getParentFile, endOffset)
+  * Cleans a log if there is a dirty log available
+  * @return whether a log was cleaned
+  */
+private def cleanFilthiestLog(): Boolean = {
+  var currentLog: Option[Log] = None
+
+  try {
+val cleaned = cleanerManager.grabFilthiestCompactedLog(time) match {
+  case None =>
+false
+  case Some(cleanable) =>
+// there's a log, clean it
+currentLog = Some(cleanable.log)
+cleanLog(cleanable)
+true
+}
+val deletable: Iterable[(TopicPartition, Log)] = 
cleanerManager.deletableLogs()
+try {
+  deletable.foreach {
+case (topicPartition, log) =>
+  try {
+currentLog = Some(log)
+log.deleteOldSegments()
+  }
   }
-  true
+} finally  {
+  cleanerManager.doneDeleting(deletable.map(_._1))
+}
+
+cleaned
+  } catch {
+case e @ (_: ThreadShutdownException | _: ControlThrowable) => throw e
+case e: Exception =>
+  if (currentLog.isEmpty) {
+throw new IllegalStateException("currentLog cannot be empty on an 
unexpected exception", e)
+  }
+  val erroneousLog = currentLog.get
+  warn(s"Unexpected exception thrown wh

[jira] [Commented] (KAFKA-7093) Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0

2018-10-08 Thread Gleb Smirnov (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642300#comment-16642300
 ] 

Gleb Smirnov commented on KAFKA-7093:
-

Also seeing it in one deployment after a broker restart. It is spamming 
gigabytes of WARN log messages for two topics, one of them being 
__consumer_offsets. Kafka version is 1.0.0. Also found among these logs is 
another message:
{code:java}
[2018-10-08 18:45:13,262] WARN Resetting first dirty offset of 
__consumer_offsets-6 to log start offset 5669124 since the checkpointed offset 
5669122 is invalid. (kafka.log.LogCleanerManager$)

[2018-10-08 18:45:13,262] WARN Resetting first dirty offset of 
__consumer_offsets-35 to log start offset 1754560 since the checkpointed offset 
1754559 is invalid. (kafka.log.LogCleanerManager$)

[2018-10-08 18:45:13,262] WARN Resetting first dirty offset of 
__consumer_offsets-2 to log start offset 2002978 since the checkpointed offset 
2002976 is invalid. (kafka.log.LogCleanerManager$){code}
Not sure if it is related.

I have access to an affected environment and perform some troubleshooting or 
gather additional logs or metrics. Please contact me at 
[m...@gvsmirnov.ru|mailto:m...@gvsmirnov.ru] if needed.

> Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0
> 
>
> Key: KAFKA-7093
> URL: https://issues.apache.org/jira/browse/KAFKA-7093
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.1.0
>Reporter: Suleyman
>Priority: Major
>
> I upgraded to kafka version from 0.11.0.1 to 1.1.0. After the upgrade, I'm 
> getting the below warn message too much.
> WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. 
> This implies messages have arrived out of order. New: \{epoch:0, 
> offset:793868383}, Current: \{epoch:4, offset:792201264} for Partition: 
> __consumer_offsets-42 (kafka.server.epoch.LeaderEpochFileCache) 
> How can I resolve this warn messages? And why I'm getting this warn messages?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6989) Support Async Processing in Streams

2018-10-08 Thread Ryanne Dolan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6989?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642261#comment-16642261
 ] 

Ryanne Dolan commented on KAFKA-6989:
-

Samza faced this same problem in the past. The solution was to introduce an 
AsyncTask and processAsync() API that works like this:

- in processAsync(), you get a record and a callback to call when you're done 
with the record.
- processAsync() is run in a separate thread pool from the consumer
- on commit(), the framework only commits offsets up to the first outstanding 
record
- the framework blocks and won't poll() if the thread pool is full
- the thread pool size is configurable with a default of 1, which essentially 
means records are processed one-by-one and in-order
- if you increase the thread pool size, you can have N outstanding records, but 
order is no longer guaranteed

These semantics would also work for Streams just fine.

> Support Async Processing in Streams
> ---
>
> Key: KAFKA-6989
> URL: https://issues.apache.org/jira/browse/KAFKA-6989
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip
>
> Today Kafka Streams use a single-thread per task architecture to achieve 
> embarrassing parallelism and good isolation. However there are a couple 
> scenarios where async processing may be preferable:
> 1) External resource access or heavy IOs with high-latency. Suppose you need 
> to access a remote REST api, read / write to an external store, or do a heavy 
> disk IO operation that may result in high latency. Current threading model 
> would block any other records before this record's done, waiting on the 
> remote call / IO to finish.
> 2) Robust failure handling with retries. Imagine the app-level processing of 
> a (non-corrupted) record fails (e.g. the user attempted to do a RPC to an 
> external system, and this call failed), and failed records are moved into a 
> separate "retry" topic. How can you process such failed records in a scalable 
> way? For example, imagine you need to implement a retry policy such as "retry 
> with exponential backoff". Here, you have the problem that 1. you can't 
> really pause processing a single record because this will pause the 
> processing of the full stream (bottleneck!) and 2. there is no 
> straight-forward way to "sort" failed records based on their "next retry 
> time" (think: priority queue).
> 3) Delayed processing. One use case is delaying re-processing (e.g. "delay 
> re-processing this event for 5 minutes") as mentioned in 2), another is for 
> implementing a scheduler: e.g. do some additional operations later based on 
> this processed record. based on Zalando Dublin, for example, are implementing 
> a distributed web crawler. Note that although this feature can be handled in 
> punctuation, it is not well aligned with our current offset committing 
> behavior, which always advance the offset once the record has been done 
> traversing the topology.
> I'm thinking of two options to support this feature:
> 1. Make the commit() mechanism more customizable to users for them to 
> implement multi-threading processing themselves: users can always do async 
> processing in the Processor API by spawning a thread-poll, e.g. but the key 
> is that the offset to be committed should be only advanced with such async 
> processing is done. This is a light-weight approach: we provide all the 
> pieces and tools, and users stack them up to build their own LEGOs.
> 2. Provide an general API to do async processing in Processor API, and take 
> care of the offsets committing internally. This is a heavy-weight approach: 
> the API may not cover all async scenarios, but it is a easy way to cover the 
> rest majority scenarios, and users do not need to worry of internal 
> implementation details such as offsets and fault tolerance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6990) CommitFailedException; this task may be no longer owned by the thread

2018-10-08 Thread Ryanne Dolan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16640256#comment-16640256
 ] 

Ryanne Dolan edited comment on KAFKA-6990 at 10/8/18 5:54 PM:
--

Usually this is due to records being processed slowly and sequentially, which 
seems to be the case here also. Consider processing records asynchronously / in 
parallel if you can, so that you can process the batch of records faster. If 
you can't do that, e.g. order matters, then reduce max.poll.records to 
something small s.t. poll() returns only a few records for you to process at a 
time.

ETA: Even so you can see this behavior when your consumer is unable to keep up 
with processing records before they fall off the end of the stream due to a 
retention policy. In this case, you'll need to increase parallelism somehow, 
e.g. with more partitions and more consumers -- reducing max.poll.records won't 
help.


was (Author: ryannedolan):
Usually this is due to records being processed slowly and sequentially, which 
seems to be the case here also. Consider processing records asynchronously / in 
parallel if you can, so that you can process the batch of records faster. If 
you can't do that, e.g. order matters, then reduce max.poll.records to 
something small s.t. poll() returns only a few records for you to process at a 
time.

> CommitFailedException; this task may be no longer owned by the thread
> -
>
> Key: KAFKA-6990
> URL: https://issues.apache.org/jira/browse/KAFKA-6990
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
>Reporter: Eugen Feller
>Priority: Blocker
>
> We are seeing a lot of CommitFailedExceptions on one of our Kafka stream 
> apps. Running Kafka Streams 0.11.0.1 and Kafka Broker 0.10.2.1. Full error 
> message:
> {code:java}
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] INFO 
> org.apache.kafka.streams.KafkaStreams - stream-client 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d] State transition from 
> REBALANCING to RUNNING.
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_0] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-0=OffsetAndMetadata{offset=11, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-0=OffsetAndMetadata{offset=26,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_0 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_1 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_1] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-1=OffsetAndMetadata{offset=38,
>  metadata=''}, sightings_sighting_byclientmac_0-1=OffsetAndMetadata{offset=1, 
> metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_2] Failed 
> offset commits 
> {mactocontactmappings_contactmapping_byclientmac_0-2=OffsetAndMetadata{offset=8,
>  metadata=''}, 
> sightings_sighting_byclientmac_0-2=OffsetAndMetadata{offset=24, metadata=''}} 
> due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] Failed to commit 
> stream task 0_2 during commit state due to CommitFailedException; this task 
> may be no longer owned by the thread
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.StreamTask - task [0_3] Failed 
> offset commits 
> {sightings_sighting_byclientmac_0-3=OffsetAndMetadata{offset=21, 
> metadata=''}, 
> mactocontactmappings_contactmapping_byclientmac_0-3=OffsetAndMetadata{offset=102,
>  metadata=''}} due to CommitFailedException
> [visits-bdb209f1-c9dd-404b-9233-064f2fb4227d-StreamThread-1] WARN 
> org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread 
> [visits

[jira] [Updated] (KAFKA-7491) Kafka streams and Kafka streams test utils have split packages

2018-10-08 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7491:

Component/s: streams

> Kafka streams and Kafka streams test utils have split packages
> --
>
> Key: KAFKA-7491
> URL: https://issues.apache.org/jira/browse/KAFKA-7491
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Robin Van Praet
>Priority: Major
>
> When trying to test a Kafka Streams application using JDK 9+, using the 
> module path, (test) compilation errors occur.
> The TopologyTestDriver cannot be located in kafka-streams-test-utils because 
> package 'org.apache.kafka.streams' is already provided by module 
> kafka-streams.
> Please make sure that packages are not re-used between production libraries 
> and test util libraries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7491) Kafka streams and Kafka streams test utils have split packages

2018-10-08 Thread John Roesler (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-7491:

Issue Type: Bug  (was: Improvement)

> Kafka streams and Kafka streams test utils have split packages
> --
>
> Key: KAFKA-7491
> URL: https://issues.apache.org/jira/browse/KAFKA-7491
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Robin Van Praet
>Priority: Major
>
> When trying to test a Kafka Streams application using JDK 9+, using the 
> module path, (test) compilation errors occur.
> The TopologyTestDriver cannot be located in kafka-streams-test-utils because 
> package 'org.apache.kafka.streams' is already provided by module 
> kafka-streams.
> Please make sure that packages are not re-used between production libraries 
> and test util libraries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7491) Kafka streams and Kafka streams test utils have split packages

2018-10-08 Thread John Roesler (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16642194#comment-16642194
 ] 

John Roesler commented on KAFKA-7491:
-

Interesting... That's a pain.

 

Can you provide some steps so we can reproduce this problem? We build and test 
Kafka (including Streams) on Java 10, and I don't think we encounter this issue.

 

It sounds like we may need to break compatibility to fix this, but I'd like to 
have a repro in hand to explore our options.

 

Thanks (and sorry),

-John

> Kafka streams and Kafka streams test utils have split packages
> --
>
> Key: KAFKA-7491
> URL: https://issues.apache.org/jira/browse/KAFKA-7491
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Robin Van Praet
>Priority: Major
>
> When trying to test a Kafka Streams application using JDK 9+, using the 
> module path, (test) compilation errors occur.
> The TopologyTestDriver cannot be located in kafka-streams-test-utils because 
> package 'org.apache.kafka.streams' is already provided by module 
> kafka-streams.
> Please make sure that packages are not re-used between production libraries 
> and test util libraries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7491) Kafka streams and Kafka streams test utils have split packages

2018-10-08 Thread Robin Van Praet (JIRA)
Robin Van Praet created KAFKA-7491:
--

 Summary: Kafka streams and Kafka streams test utils have split 
packages
 Key: KAFKA-7491
 URL: https://issues.apache.org/jira/browse/KAFKA-7491
 Project: Kafka
  Issue Type: Improvement
Reporter: Robin Van Praet


When trying to test a Kafka Streams application using JDK 9+, using the module 
path, (test) compilation errors occur.

The TopologyTestDriver cannot be located in kafka-streams-test-utils because 
package 'org.apache.kafka.streams' is already provided by module kafka-streams.

Please make sure that packages are not re-used between production libraries and 
test util libraries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7490) Kafka libraries should have a valid module name

2018-10-08 Thread Robin Van Praet (JIRA)
Robin Van Praet created KAFKA-7490:
--

 Summary: Kafka libraries should have a valid module name
 Key: KAFKA-7490
 URL: https://issues.apache.org/jira/browse/KAFKA-7490
 Project: Kafka
  Issue Type: Improvement
Reporter: Robin Van Praet


Currently when running an application on Java's module path it is impossible to 
have kafka as an automatic module.

For example:
{code:java}

   org.apache.kafka
   kafka_2.12
   2.0.0

{code}
Will result in file kafka_2.12-2.0.0.jar 

The rules for Automatic module naming are:
 * use 'Automatic-Module-Name' value in the Manifest file as value when present
 * convert the jar filename to a module name by stripping the version and 
converting some characters to a dot.

There is no 'Automatic-Module-Name' value in the Manifest so the jar filename 
is used. However it will be converted to kafka.2.12 as module name which is not 
a valid module name.

 

Please provide a 'Automatic-Module-Name' value in the Manifest file for all 
deliverables



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7093) Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0

2018-10-08 Thread Kyle Xiong (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7093?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641687#comment-16641687
 ] 

Kyle Xiong commented on KAFKA-7093:
---

Seeing the same logs after restarting the brokers. Only on the  
__consumer_offsets partitions. 

> Kafka warn messages after upgrade from 0.11.0.1 to 1.1.0
> 
>
> Key: KAFKA-7093
> URL: https://issues.apache.org/jira/browse/KAFKA-7093
> Project: Kafka
>  Issue Type: Bug
>  Components: offset manager
>Affects Versions: 1.1.0
>Reporter: Suleyman
>Priority: Major
>
> I upgraded to kafka version from 0.11.0.1 to 1.1.0. After the upgrade, I'm 
> getting the below warn message too much.
> WARN Received a PartitionLeaderEpoch assignment for an epoch < latestEpoch. 
> This implies messages have arrived out of order. New: \{epoch:0, 
> offset:793868383}, Current: \{epoch:4, offset:792201264} for Partition: 
> __consumer_offsets-42 (kafka.server.epoch.LeaderEpochFileCache) 
> How can I resolve this warn messages? And why I'm getting this warn messages?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7274) Incorrect subject credential used in inter-broker communication

2018-10-08 Thread Rajini Sivaram (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7274?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641649#comment-16641649
 ] 

Rajini Sivaram commented on KAFKA-7274:
---

[~xiaotao183] We can configure only one mechanism for inter-broker 
communication, and none of our built-in mechanisms requires username/password 
for a mechanism that is not used for inter-broker. So static jaas config as 
described in the docs for these mechanisms does work. The limitation is only on 
adding conflicting unused options. Feel free to submit a PR to clarify this in 
the docs.

Going forward, `sasl.jaas.config` options are actually more flexible for both 
broker-side and client-side. On the broker-side, these are specified 
per-mechanism, they can be stored encrypted in ZooKeeper and they work without 
any conflicts for any mechanism since they can be prefixed with listener and 
mechanism name.

> Incorrect subject credential used in inter-broker communication
> ---
>
> Key: KAFKA-7274
> URL: https://issues.apache.org/jira/browse/KAFKA-7274
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0
>Reporter: TAO XIAO
>Priority: Major
>
> We configured one broker setup to enable multiple SASL mechanisms using JAAS 
> config file but we failed to start up the broker.
>  
> Here is security section of server.properties
>  
> {{listeners=SASL_PLAINTEXT://:9092
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256
> sasl.mechanism.inter.broker.protocol=PLAIN}}{{}}
>  
> JAAS file
>  
> {noformat}
> sasl_plaintext.KafkaServer {
>   org.apache.kafka.common.security.plain.PlainLoginModule required
>   username="admin"
>   password="admin-secret"
>   user_admin="admin-secret"
>   user_alice="alice-secret";
>   org.apache.kafka.common.security.scram.ScramLoginModule required
>   username="admin1"
>   password="admin-secret";
> };{noformat}
>  
> Exception we got
>  
> {noformat}
> [2018-08-10 12:12:13,070] ERROR [Controller id=0, targetBrokerId=0] 
> Connection to node 0 failed authentication due to: Authentication failed: 
> Invalid username or password 
> (org.apache.kafka.clients.NetworkClient){noformat}
>  
> If we changed to use broker configuration property we can start broker 
> successfully
>  
> {noformat}
> listeners=SASL_PLAINTEXT://:9092
> security.inter.broker.protocol=SASL_PLAINTEXT
> sasl.enabled.mechanisms=PLAIN,SCRAM-SHA-256
> sasl.mechanism.inter.broker.protocol=PLAIN
> listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule
>  required username="admin" password="admin-secret" user_admin="admin-secret" 
> user_alice="alice-secret";
> listener.name.sasl_plaintext.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule
>  required username="admin1" password="admin-secret";{noformat}
>  
> I believe this issue is caused by Kafka assigning all login modules to each 
> defined mechanism when using JAAS file which results in Login class to add 
> both username defined in each login module to the same subject
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/JaasContext.java#L101]
>  
> [https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/security/authenticator/LoginManager.java#L63]
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7462) Kafka brokers cannot provide OAuth without a token

2018-10-08 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7462.
---
   Resolution: Fixed
Fix Version/s: (was: 2.2.0)
   2.1.0

> Kafka brokers cannot provide OAuth without a token
> --
>
> Key: KAFKA-7462
> URL: https://issues.apache.org/jira/browse/KAFKA-7462
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 2.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.1.0
>
>
> Like with all other SASL mechanisms, OAUTHBEARER uses the same LoginModule 
> class on both  server-side and the client-side. But unlike PLAIN or SCRAM 
> where client credentials are optional, OAUTHBEARER requires always requires a 
> token. So while with PLAIN/SCRAM, broker only needs to specify client 
> credentials if the mechanism is used for inter-broker communication, with 
> OAuth, broker requires client credentials even if OAuth is not used for 
> inter-broker communication. This is an issue with the default 
> `OAuthBearerUnsecuredLoginCallbackHandler` used on both client-side and 
> server-side. But more critically, it is an issue with 
> `OAuthBearerLoginModule` which doesn't commit if token == null (commit() 
> returns false).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7462) Kafka brokers cannot provide OAuth without a token

2018-10-08 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641568#comment-16641568
 ] 

ASF GitHub Bot commented on KAFKA-7462:
---

rajinisivaram closed pull request #5733: KAFKA-7462: Make token optional for 
OAuthBearerLoginModule
URL: https://github.com/apache/kafka/pull/5733
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
index 1dcd1991aed..e3a78103560 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerLoginModule.java
@@ -236,6 +236,21 @@
  * @see SaslConfigs#SASL_LOGIN_REFRESH_BUFFER_SECONDS_DOC
  */
 public class OAuthBearerLoginModule implements LoginModule {
+
+/**
+ * Login state transitions:
+ *   Initial state: NOT_LOGGED_IN
+ *   login()  : NOT_LOGGED_IN => LOGGED_IN_NOT_COMMITTED
+ *   commit() : LOGGED_IN_NOT_COMMITTED => COMMITTED
+ *   abort()  : LOGGED_IN_NOT_COMMITTED => NOT_LOGGED_IN
+ *   logout() : Any state => NOT_LOGGED_IN
+ */
+private enum LoginState {
+NOT_LOGGED_IN,
+LOGGED_IN_NOT_COMMITTED,
+COMMITTED
+}
+
 /**
  * The SASL Mechanism name for OAuth 2: {@code OAUTHBEARER}
  */
@@ -248,6 +263,7 @@
 private OAuthBearerToken myCommittedToken = null;
 private SaslExtensions extensionsRequiringCommit = null;
 private SaslExtensions myCommittedExtensions = null;
+private LoginState loginState;
 
 static {
 OAuthBearerSaslClientProvider.initialize(); // not part of public API
@@ -266,17 +282,29 @@ public void initialize(Subject subject, CallbackHandler 
callbackHandler, Map iterator = 
subject.getPrivateCredentials().iterator(); iterator.hasNext();) {
-Object privateCredential = iterator.next();
-if (privateCredential == myCommittedToken) {
-iterator.remove();
-myCommittedToken = null;
-break;
+if (myCommittedToken != null) {
+log.info("Logging out my token; current committed token count = 
{}", committedTokenCount());
+for (Iterator iterator = 
subject.getPrivateCredentials().iterator(); iterator.hasNext(); ) {
+Object privateCredential = iterator.next();
+if (privateCredential == myCommittedToken) {
+iterator.remove();
+myCommittedToken = null;
+break;
+}
 }
-}
-log.info("Done logging out my token; committed token count is now {}", 
committedTokenCount());
+log.info("Done logging out my token; committed token count is now 
{}", committedTokenCount());
+} else
+log.debug("No tokens to logout for this login");
 
-log.info("Logging out my extensions");
-if (subject.getPublicCredentials().removeIf(e -> myCommittedExtensions 
== e))
-myCommittedExtensions = null;
-log.info("Done logging out my extensions");
+if (myCommittedExtensions != null) {
+log.info("Logging out my extensions");
+if (subject.getPublicCredentials().removeIf(e -> 
myCommittedExtensions == e))
+myCommittedExtensions = null;
+log.info("Done logging out my extensions");
+} else
+log.debug("No extensions to logout for this login");
 
+loginState = LoginState.NOT_LOGGED_IN;
 return true;
 }
 
 @Override
 public boolean commit() {
-if (tokenRequiringCommit == null) {
+if (loginState != LoginState.LOGGED_IN_NOT_COMMITTED) {
 if (log.isDebugEnabled())
 log.debug("Nothing here to commit");
 return false;
 }
 
-log.info("Committing my token; current committed token count = {}", 
committedTokenCount());
-subject.getPrivateCredentials().add(tokenRequiringCommit);
-myCommittedToken = tokenRequiringCommit;
-tokenRequiringCommit = null;
-log.info("Done committing my token; committed token count is now {}", 
committedTokenCount());
+if (tokenRequiringCommit != null) {
+log.info("Committing my token; current committed token count = 
{}", committedTokenCount());
+subject.getPrivateCredentials().add(tokenRequiringCommit);
+myCommittedToken = tokenRequiringCommit;
+   

[jira] [Created] (KAFKA-7489) ConnectDistributedTest is always running broker with dev version

2018-10-08 Thread Andras Katona (JIRA)
Andras Katona created KAFKA-7489:


 Summary: ConnectDistributedTest is always running broker with dev 
version
 Key: KAFKA-7489
 URL: https://issues.apache.org/jira/browse/KAFKA-7489
 Project: Kafka
  Issue Type: Test
  Components: KafkaConnect, system tests
Reporter: Andras Katona


h2. Test class
kafkatest.tests.connect.connect_distributed_test.ConnectDistributedTest

h2. Details
_test_broker_compatibility_ is +parametrized+ with different types of brokers, 
yet it is passed as string to _setup_services_ and this way KafkaService is 
initialised with DEV version in the end.

This is easy to fix, just wrap the _broker_version_ with KafkaVersion
{panel}
self.setup_services(broker_version={color:#FF}KafkaVersion{color}(broker_version),
 auto_create_topics=auto_create_topics, security_protocol=security_protocol)
{panel}

But test is failing with the parameter LATEST_0_9 with the following error 
message
{noformat}
Kafka Connect failed to start on node: ducker@ducker02 in condition mode: LISTEN
{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-6448) Mx4jLoader#props.getBoolean("kafka_mx4jenable", false) conflict with the annotation

2018-10-08 Thread Hongyuan Li (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6448?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641277#comment-16641277
 ] 

Hongyuan Li edited comment on KAFKA-6448 at 10/8/18 8:17 AM:
-

made a new pull request, [https://github.com/apache/kafka/pull/5758]

[~lindong]  [~rsivaram] [~damianguy] [~ijuma]

 


was (Author: hongyuan li):
made a new pull request, [https://github.com/apache/kafka/pull/5758]

[~lindong]  [~rsivaram] [~damianguy]

 

> Mx4jLoader#props.getBoolean("kafka_mx4jenable", false) conflict with the 
> annotation
> ---
>
> Key: KAFKA-6448
> URL: https://issues.apache.org/jira/browse/KAFKA-6448
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Hongyuan Li
>Priority: Minor
> Fix For: 2.2.0
>
> Attachments: KAFKA-6448-1.patch, KAFKA-6448-2.patch
>
>
> In the annotation, it said 
> {code}*This feature must be enabled with -Dmx4jenable=true*{code}
> *which is not compatible with the code* 
> {code}
> **
> props.getBoolean("kafka_mx4jenable", false)
>  **
> {code}
> patch KAFKA-6448-1.patch modifies the code, and KAFKA-6448-2.patch modifies 
> the annotation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)