[jira] [Updated] (KAFKA-10143) Can no longer change replication throttle with reassignment tool

2020-06-10 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson updated KAFKA-10143:

Description: 
Previously we could use --execute with the --throttle option in order to change 
the quota of an active reassignment. We seem to have lost this with KIP-455. 
The code has the following comment:
{code}
val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress()
if (reassignPartitionsInProgress) {
  // Note: older versions of this tool would modify the broker quotas here 
(but not
  // topic quotas, for some reason).  This behavior wasn't documented in 
the --execute
  // command line help.  Since it might interfere with other ongoing 
reassignments,
  // this behavior was dropped as part of the KIP-455 changes.
  throw new 
TerseReassignmentFailureException(cannotExecuteBecauseOfExistingMessage)
}
{code}
Seems like it was a mistake to change this because it breaks compatibility. We 
probably have to revert. At the same time, we can make the intent clearer both 
in the code and in the command help output.

  was:
Previously we could use --execute with the --throttle option in order to change 
the quota of an active reassignment. We seem to have lost this with KIP-455. 
The code has the following comment:
{code}
val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress()
if (reassignPartitionsInProgress) {
  // Note: older versions of this tool would modify the broker quotas here 
(but not
  // topic quotas, for some reason).  This behavior wasn't documented in 
the --execute
  // command line help.  Since it might interfere with other ongoing 
reassignments,
  // this behavior was dropped as part of the KIP-455 changes.
  throw new 
TerseReassignmentFailureException(cannotExecuteBecauseOfExistingMessage)
}
{code}
Seems like it was a mistake to change this because it breaks compatibility. We 
probably have to revert.


> Can no longer change replication throttle with reassignment tool
> 
>
> Key: KAFKA-10143
> URL: https://issues.apache.org/jira/browse/KAFKA-10143
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Priority: Blocker
> Fix For: 2.6.0
>
>
> Previously we could use --execute with the --throttle option in order to 
> change the quota of an active reassignment. We seem to have lost this with 
> KIP-455. The code has the following comment:
> {code}
> val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress()
> if (reassignPartitionsInProgress) {
>   // Note: older versions of this tool would modify the broker quotas 
> here (but not
>   // topic quotas, for some reason).  This behavior wasn't documented in 
> the --execute
>   // command line help.  Since it might interfere with other ongoing 
> reassignments,
>   // this behavior was dropped as part of the KIP-455 changes.
>   throw new 
> TerseReassignmentFailureException(cannotExecuteBecauseOfExistingMessage)
> }
> {code}
> Seems like it was a mistake to change this because it breaks compatibility. 
> We probably have to revert. At the same time, we can make the intent clearer 
> both in the code and in the command help output.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] aakashnshah commented on pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

2020-06-10 Thread GitBox


aakashnshah commented on pull request #8829:
URL: https://github.com/apache/kafka/pull/8829#issuecomment-642418469


   I've updated the KIP: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors
 @rhauch @wicknicks 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9216) Enforce connect internal topic configuration at startup

2020-06-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9216.
--
  Reviewer: Randall Hauch
Resolution: Fixed

Merged to `trunk` the second PR that enforces the `cleanup.policy` topic 
setting on Connect's three internal topics, and cherry-picked it to the `2.6` 
(for upcoming 2.6.0). However, merging to earlier branches requires too many 
changes in integration tests.

> Enforce connect internal topic configuration at startup
> ---
>
> Key: KAFKA-9216
> URL: https://issues.apache.org/jira/browse/KAFKA-9216
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Evelyn Bayes
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9845) plugin.path property does not work with config provider

2020-06-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9845.
--
Fix Version/s: 2.7.0
 Reviewer: Randall Hauch
   Resolution: Fixed

Merged to `trunk`, and backported to `2.6` (for upcoming 2.6.0), `2.5` (for 
upcoming 2.5.1), and `2.4` (for future 2.4.2).

> plugin.path property does not work with config provider
> ---
>
> Key: KAFKA-9845
> URL: https://issues.apache.org/jira/browse/KAFKA-9845
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 2.6.0, 2.4.2, 2.5.1, 2.7.0
>
>
> The config provider mechanism doesn't work if used for the {{plugin.path}} 
> property of a standalone or distributed Connect worker. This is because the 
> {{Plugins}} instance which performs plugin path scanning is created using the 
> raw worker config, pre-transformation (see 
> [ConnectStandalone|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java#L79]
>  and 
> [ConnectDistributed|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java#L91]).
> Unfortunately, because config providers are loaded as plugins, there's a 
> circular dependency issue here. The {{Plugins}} instance needs to be created 
> _before_ the {{DistributedConfig}}/{{StandaloneConfig}} is created in order 
> for the config providers to be loaded correctly, and the config providers 
> need to be loaded in order to perform their logic on any properties 
> (including the {{plugin.path}} property).
> There is no clear fix for this issue in the code base, and the only known 
> workaround is to refrain from using config providers for the {{plugin.path}} 
> property.
> A couple improvements could potentially be made to improve the UX when this 
> issue arises:
>  #  Alter the config logging performed by the {{DistributedConfig}} and 
> {{StandaloneConfig}} classes to _always_ log the raw value for the 
> {{plugin.path}} property. Right now, the transformed value is logged even 
> though it isn't used, which is likely to cause confusion.
>  # Issue a {{WARN}}- or even {{ERROR}}-level log message when it's detected 
> that the user is attempting to use config providers for the {{plugin.path}} 
> property, which states that config providers cannot be used for that specific 
> property, instructs them to change the value for the property accordingly, 
> and/or informs them of the actual value that the framework will use for that 
> property when performing plugin path scanning.
> We should _not_ throw an error on startup if this condition is detected, as 
> this could cause previously-functioning, benignly-misconfigured Connect 
> workers to fail to start after an upgrade.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6942) Connect connectors api doesn't show versions of connectors

2020-06-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-6942.
--
Resolution: Invalid

I'm going to close this as INVALID because the versions are available in the 
API, as noted above.

> Connect connectors api doesn't show versions of connectors
> --
>
> Key: KAFKA-6942
> URL: https://issues.apache.org/jira/browse/KAFKA-6942
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Priority: Minor
>  Labels: needs-kip
>
> Would be very useful to have the connector list API response also return the 
> version of the installed connectors.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-06-10 Thread GitBox


kkonstantine commented on pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-642405060


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-06-10 Thread GitBox


kkonstantine commented on pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-642404708


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-06-10 Thread GitBox


kkonstantine commented on pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-642404597


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-06-10 Thread GitBox


C0urante commented on pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-642402730


   I believe I've identified the cause of the failure in 
https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1048. I've pushed a new 
commit to fix the logic; it passed tests locally, hopefully Jenkins will be 
happy with it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10115) Incorporate errors.tolerance with the Errant Record Reporter

2020-06-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-10115.
---
  Reviewer: Randall Hauch
Resolution: Fixed

Merged to `2.6` rather than `trunk` (accidentally) and cherry-picked to `trunk`.

> Incorporate errors.tolerance with the Errant Record Reporter
> 
>
> Key: KAFKA-10115
> URL: https://issues.apache.org/jira/browse/KAFKA-10115
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Aakash Shah
>Assignee: Aakash Shah
>Priority: Major
> Fix For: 2.6.0
>
>
> The errors.tolerance config is currently not being used when using the Errant 
> Record Reporter. If errors.tolerance is none then the task should fail after 
> sending it to the DLQ in Kafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9066) Kafka Connect JMX : source & sink task metrics missing for tasks in failed state

2020-06-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-9066.
--
Fix Version/s: 2.7.0
 Reviewer: Randall Hauch
   Resolution: Fixed

Merged to `trunk`, and backported to `2.6` (for upcoming 2.6.0). I'll file a 
separate issue to backport this to `2.5` (since we're in-progress on releasing 
2.5.1) and `2.4`.

> Kafka Connect JMX : source & sink task metrics missing for tasks in failed 
> state
> 
>
> Key: KAFKA-9066
> URL: https://issues.apache.org/jira/browse/KAFKA-9066
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.1.1
>Reporter: Mikołaj Stefaniak
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.6.0, 2.7.0
>
>
> h2. Overview
> Kafka Connect exposes various metrics via JMX. Those metrics can be exported 
> i.e. by _Prometheus JMX Exporter_ for further processing.
> One of crucial attributes is connector's *task status.*
> According to official Kafka docs, status is available as +status+ attribute 
> of following MBean:
> {quote}kafka.connect:type=connector-task-metrics,connector="\{connector}",task="\{task}"status
>  - The status of the connector task. One of 'unassigned', 'running', 
> 'paused', 'failed', or 'destroyed'.
> {quote}
> h2. Issue
> Generally +connector-task-metrics+ are exposed propery for tasks in +running+ 
> status but not exposed at all if task is +failed+.
> Failed Task *appears* properly with failed status when queried via *REST API*:
>  
> {code:java}
> $ curl -X GET -u 'user:pass' 
> http://kafka-connect.mydomain.com/connectors/customerconnector/status
> {"name":"customerconnector","connector":{"state":"RUNNING","worker_id":"kafka-connect.mydomain.com:8080"},"tasks":[{"id":0,"state":"FAILED","worker_id":"kafka-connect.mydomain.com:8080","trace":"org.apache.kafka.connect.errors.ConnectException:
>  Received DML 'DELETE FROM mysql.rds_sysinfo .."}],"type":"source"}
> $ {code}
>  
> Failed Task *doesn't appear* as bean with +connector-task-metrics+ type when 
> queried via *JMX*:
>  
> {code:java}
> $ echo "beans -d kafka.connect" | java -jar 
> target/jmxterm-1.1.0-SNAPSHOT-uber.jar -l localhost:8081 -n -v silent | grep 
> connector=customerconnector
> kafka.connect:connector=customerconnector,task=0,type=task-error-metricskafka.connect:connector=customerconnector,type=connector-metrics
> $
> {code}
> h2. Expected result
> It is expected, that bean with +connector-task-metrics+ type will appear also 
> for tasks that failed.
> Below is example of how beans are properly registered for tasks in Running 
> state:
>  
> {code:java}
> $ echo "get -b 
> kafka.connect:connector=sinkConsentSubscription-1000,task=0,type=connector-task-metrics
>  status" | java -jar target/jmxterm-1.1.0-SNAPSHOT-ube r.jar -l 
> localhost:8081 -n -v silent
> status = running;
> $
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


vvcephei commented on a change in pull request #8851:
URL: https://github.com/apache/kafka/pull/8851#discussion_r438533186



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##
@@ -959,28 +992,23 @@ private void maybeRecordE2ELatency(final long 
recordTimestamp, final long now, f
 }
 }
 
-/**
- * Request committing the current task's state
- */
-void requestCommit() {
-commitRequested = true;
+public InternalProcessorContext processorContext() {
+return processorContext;
 }
 
-/**
- * Whether or not a request has been made to commit the current state
- */
-@Override
-public boolean commitRequested() {
-return commitRequested;
+public boolean hasRecordsQueued() {
+return numBuffered() > 0;
 }
 
+// visible for testing

Review comment:
   Can we avoid these comments? I've come across too many cases where it 
had become untrue recently, to the point where it just seems pointless to have 
them.
   
   I know you're preserving the comment that was there before while moving the 
method, but I think we should just clean up the comments as well.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) {
  */
 void completeRestoration();
 
-void addRecords(TopicPartition partition, Iterable> records);
-
-boolean commitNeeded();
-
-/**
- * @throws StreamsException fatal error, should close the thread
- */
-Map prepareCommit();
-
-void postCommit();
-
 void suspend();
 
 /**
- *
  * @throws StreamsException fatal error, should close the thread
  */
 void resume();
 
-/**
- * Must be idempotent.
- */
+void closeDirty();
+
 void closeClean();
 
-/**
- * Must be idempotent.
- */
-void closeDirty();
+
+// non-idempotent life-cycle methods
 
 /**
- * Updates input partitions and topology after rebalance
+ * Revive a closed task to a created one; should never throw an exception
  */
-void update(final Set topicPartitions, final Map> nodeToSourceTopics);
+void revive();
 
 /**
  * Attempt a clean close but do not close the underlying state
  */
 void closeAndRecycleState();
 
-/**
- * Revive a closed task to a created one; should never throw an exception
- */
-void revive();
-
-StateStore getStore(final String name);
-
-Set inputPartitions();
+void markChangelogAsCorrupted(final Collection partitions);
 
-/**
- * @return any changelog partitions associated with this task
- */
-Collection changelogPartitions();
 
-/**
- * @return the offsets of all the changelog partitions associated with 
this task,
- * indicating the current positions of the logged state stores of 
the task.
- */
-Map changelogOffsets();
+// runtime methods (using in RUNNING state)
 
-void markChangelogAsCorrupted(final Collection partitions);
+void addRecords(TopicPartition partition, Iterable> records);
 
-default Map purgeableOffsets() {
-return Collections.emptyMap();
+default boolean process(final long wallClockTime) {
+return false;
 }
 
 default void recordProcessBatchTime(final long processBatchTime) {}
 
 default void recordProcessTimeRatioAndBufferSize(final long 
allTaskProcessMs, final long now) {}
 
-default boolean process(final long wallClockTime) {
+default boolean maybePunctuateStreamTime() {
 return false;
 }
 
-default boolean commitRequested() {
+default boolean maybePunctuateSystemTime() {
 return false;
 }
 
-default boolean maybePunctuateStreamTime() {
+boolean commitNeeded();
+
+default boolean commitRequested() {
 return false;
 }
 
-default boolean maybePunctuateSystemTime() {
-return false;
+/**
+ * @throws StreamsException fatal error, should close the thread
+ */
+Map prepareCommit();
+
+void postCommit();
+
+default Map purgeableOffsets() {
+return Collections.emptyMap();
 }
 
+
+// task status inquiry
+
+TaskId id();
+
+State state();
+
+boolean isActive();
+
+/**
+ * Updates input partitions after a rebalance
+ */
+void updateInputPartitions(final Set topicPartitions, 
final Map> nodeToSourceTopics);
+
+Set inputPartitions();
+
+/**
+ * @return any changelog partitions associated with this task
+ */
+Collection changelogPartitions();
+
+
+// IQ related methods

Review comment:
   I appreciate that you've taken the time to organize these methods, and 
that these comments are an attempt to make sure they stay organized, but I'm 
afraid that they'll just become misleading over time, the way that the "for 

[GitHub] [kafka] mjsax commented on pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on pull request #8851:
URL: https://github.com/apache/kafka/pull/8851#issuecomment-642396153


   Closing in favor of #8852



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax closed pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax closed pull request #8851:
URL: https://github.com/apache/kafka/pull/8851


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r438533555



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) {
  */
 void completeRestoration();
 
-void addRecords(TopicPartition partition, Iterable> records);
-
-boolean commitNeeded();
-
-/**
- * @throws StreamsException fatal error, should close the thread
- */
-Map prepareCommit();
-
-void postCommit();
-
 void suspend();
 
 /**
- *
  * @throws StreamsException fatal error, should close the thread
  */
 void resume();
 
-/**
- * Must be idempotent.
- */
+void closeDirty();
+
 void closeClean();
 
-/**
- * Must be idempotent.
- */
-void closeDirty();
+
+// non-idempotent life-cycle methods
 
 /**
- * Updates input partitions and topology after rebalance
+ * Revive a closed task to a created one; should never throw an exception
  */
-void update(final Set topicPartitions, final Map> nodeToSourceTopics);
+void revive();
 
 /**
  * Attempt a clean close but do not close the underlying state
  */
 void closeAndRecycleState();
 
-/**
- * Revive a closed task to a created one; should never throw an exception
- */
-void revive();
-
-StateStore getStore(final String name);
-
-Set inputPartitions();
+void markChangelogAsCorrupted(final Collection partitions);
 
-/**
- * @return any changelog partitions associated with this task
- */
-Collection changelogPartitions();
 
-/**
- * @return the offsets of all the changelog partitions associated with 
this task,
- * indicating the current positions of the logged state stores of 
the task.
- */
-Map changelogOffsets();
+// runtime methods (using in RUNNING state)
 
-void markChangelogAsCorrupted(final Collection partitions);
+void addRecords(TopicPartition partition, Iterable> records);
 
-default Map purgeableOffsets() {
-return Collections.emptyMap();
+default boolean process(final long wallClockTime) {
+return false;
 }
 
 default void recordProcessBatchTime(final long processBatchTime) {}
 
 default void recordProcessTimeRatioAndBufferSize(final long 
allTaskProcessMs, final long now) {}
 
-default boolean process(final long wallClockTime) {
+default boolean maybePunctuateStreamTime() {
 return false;
 }
 
-default boolean commitRequested() {
+default boolean maybePunctuateSystemTime() {
 return false;
 }
 
-default boolean maybePunctuateStreamTime() {
+boolean commitNeeded();
+
+default boolean commitRequested() {
 return false;
 }
 
-default boolean maybePunctuateSystemTime() {
-return false;
+/**
+ * @throws StreamsException fatal error, should close the thread
+ */
+Map prepareCommit();
+
+void postCommit();
+
+default Map purgeableOffsets() {
+return Collections.emptyMap();
 }
 
+
+// task status inquiry
+
+TaskId id();
+
+State state();
+
+boolean isActive();
+
+/**
+ * Updates input partitions after a rebalance
+ */
+void updateInputPartitions(final Set topicPartitions, 
final Map> nodeToSourceTopics);

Review comment:
   Renamed from `update` to `updateInputPartitions`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r438533429



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) {
 }
 }
 
-TaskId id();

Review comment:
   Group and order methods (compare in-line comments). -- Sub-classed 
inherit this ordering.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) {
 }
 }
 
-TaskId id();
 
-State state();
 
-boolean isActive();
-
-boolean isClosed();

Review comment:
   This method is unused and removed. That is the only actual change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r438533374



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -57,30 +59,31 @@
  * @param stateDirectory the {@link StateDirectory} created by the thread
  */
 StandbyTask(final TaskId id,
-final Set partitions,
 final ProcessorTopology topology,
-final StreamsConfig config,
-final StreamsMetricsImpl metrics,
-final ProcessorStateManager stateMgr,
 final StateDirectory stateDirectory,
+final ProcessorStateManager stateMgr,
+final Set partitions,
+final StreamsConfig config,
+final InternalProcessorContext processorContext,
 final ThreadCache cache,
-final InternalProcessorContext processorContext) {
+final StreamsMetricsImpl metrics) {
 super(id, topology, stateDirectory, stateMgr, partitions);

Review comment:
   Put "super parameter first" in constructor list





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r438533205



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -16,82 +16,85 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import org.slf4j.Logger;
 
 import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 
 public abstract class AbstractTask implements Task {
 private Task.State state = CREATED;
-protected Set inputPartitions;
-protected ProcessorTopology topology;
 
 protected final TaskId id;
+protected final ProcessorTopology topology;
 protected final StateDirectory stateDirectory;
 protected final ProcessorStateManager stateMgr;
 
+protected Set inputPartitions;
+
 AbstractTask(final TaskId id,
  final ProcessorTopology topology,
  final StateDirectory stateDirectory,
  final ProcessorStateManager stateMgr,
  final Set inputPartitions) {
 this.id = id;
-this.stateMgr = stateMgr;
 this.topology = topology;
-this.inputPartitions = inputPartitions;
+this.stateMgr = stateMgr;
 this.stateDirectory = stateDirectory;
+this.inputPartitions = inputPartitions;

Review comment:
   Align assignment to parameter order.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8852:
URL: https://github.com/apache/kafka/pull/8852#discussion_r438533174



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -16,82 +16,85 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import org.slf4j.Logger;
 
 import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 
 public abstract class AbstractTask implements Task {
 private Task.State state = CREATED;
-protected Set inputPartitions;
-protected ProcessorTopology topology;
 
 protected final TaskId id;
+protected final ProcessorTopology topology;
 protected final StateDirectory stateDirectory;
 protected final ProcessorStateManager stateMgr;
 
+protected Set inputPartitions;

Review comment:
   Align members to constructor parameter order, and group final / mutable.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #8852: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax opened a new pull request #8852:
URL: https://github.com/apache/kafka/pull/8852


   Not functional change. Pure code cleanup
   
   Call for review @vvcephei



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] brary commented on pull request #8200: KAFKA-5876: IQ should throw different exceptions for different errors(part 1)

2020-06-10 Thread GitBox


brary commented on pull request #8200:
URL: https://github.com/apache/kafka/pull/8200#issuecomment-642394605


   LGTM!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642394105


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8851:
URL: https://github.com/apache/kafka/pull/8851#discussion_r438531936



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) {
  */
 void completeRestoration();
 
-void addRecords(TopicPartition partition, Iterable> records);
-
-boolean commitNeeded();
-
-/**
- * @throws StreamsException fatal error, should close the thread
- */
-Map prepareCommit();
-
-void postCommit();
-
 void suspend();
 
 /**
- *
  * @throws StreamsException fatal error, should close the thread
  */
 void resume();
 
-/**
- * Must be idempotent.
- */
+void closeDirty();
+
 void closeClean();
 
-/**
- * Must be idempotent.
- */
-void closeDirty();
+
+// non-idempotent life-cycle methods
 
 /**
- * Updates input partitions and topology after rebalance
+ * Revive a closed task to a created one; should never throw an exception
  */
-void update(final Set topicPartitions, final Map> nodeToSourceTopics);
+void revive();
 
 /**
  * Attempt a clean close but do not close the underlying state
  */
 void closeAndRecycleState();
 
-/**
- * Revive a closed task to a created one; should never throw an exception
- */
-void revive();
-
-StateStore getStore(final String name);
-
-Set inputPartitions();
+void markChangelogAsCorrupted(final Collection partitions);
 
-/**
- * @return any changelog partitions associated with this task
- */
-Collection changelogPartitions();
 
-/**
- * @return the offsets of all the changelog partitions associated with 
this task,
- * indicating the current positions of the logged state stores of 
the task.
- */
-Map changelogOffsets();
+// runtime methods (using in RUNNING state)
 
-void markChangelogAsCorrupted(final Collection partitions);
+void addRecords(TopicPartition partition, Iterable> records);
 
-default Map purgeableOffsets() {
-return Collections.emptyMap();
+default boolean process(final long wallClockTime) {
+return false;
 }
 
 default void recordProcessBatchTime(final long processBatchTime) {}
 
 default void recordProcessTimeRatioAndBufferSize(final long 
allTaskProcessMs, final long now) {}
 
-default boolean process(final long wallClockTime) {
+default boolean maybePunctuateStreamTime() {
 return false;
 }
 
-default boolean commitRequested() {
+default boolean maybePunctuateSystemTime() {
 return false;
 }
 
-default boolean maybePunctuateStreamTime() {
+boolean commitNeeded();
+
+default boolean commitRequested() {
 return false;
 }
 
-default boolean maybePunctuateSystemTime() {
-return false;
+/**
+ * @throws StreamsException fatal error, should close the thread
+ */
+Map prepareCommit();
+
+void postCommit();
+
+default Map purgeableOffsets() {
+return Collections.emptyMap();
 }
 
+
+// task status inquiry
+
+TaskId id();
+
+State state();
+
+boolean isActive();
+
+/**
+ * Updates input partitions after a rebalance
+ */
+void updateInputPartitions(final Set topicPartitions, 
final Map> nodeToSourceTopics);

Review comment:
   Renamed from `update` to `updateInputPartitions`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8851:
URL: https://github.com/apache/kafka/pull/8851#discussion_r438531698



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) {
 }
 }
 
-TaskId id();
 
-State state();
 
-boolean isActive();
-
-boolean isClosed();

Review comment:
   This method is unused and removed. That is the only actual change.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8851:
URL: https://github.com/apache/kafka/pull/8851#discussion_r438531571



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##
@@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) {
 }
 }
 
-TaskId id();

Review comment:
   Group and order methods (compare in-line comments). -- Sub-classed 
inherit this ordering.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] brary commented on a change in pull request #8200: KAFKA-5876: IQ should throw different exceptions for different errors(part 1)

2020-06-10 Thread GitBox


brary commented on a change in pull request #8200:
URL: https://github.com/apache/kafka/pull/8200#discussion_r438531599



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java
##
@@ -18,13 +18,11 @@
 
 
 /**
- * Indicates that there was a problem when trying to access a
- * {@link org.apache.kafka.streams.processor.StateStore StateStore}, i.e, the 
Store is no longer valid because it is
- * closed or doesn't exist any more due to a rebalance.
- * 
- * These exceptions may be transient, i.e., during a rebalance it won't be 
possible to query the stores as they are
- * being (re)-initialized. Once the rebalance has completed the stores will be 
available again. Hence, it is valid
- * to backoff and retry when handling this exception.
+ * Indicates that there was a problem when trying to access a {@link 
org.apache.kafka.streams.processor.StateStore StateStore}.
+ * InvalidStateStoreException not thrown directly but only following 
sub-classes:
+ * {@link StreamsNotStartedException}, {@link StreamsRebalancingException},
+ * {@link StateStoreMigratedException}, {@link 
StateStoreNotAvailableException},
+ * {@link UnknownStateStoreException}, {@link 
InvalidStateStorePartitionException}
  */
 public class InvalidStateStoreException extends StreamsException {

Review comment:
   Sure @vitojeng . cc @mjsax 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9841) Connector and Task duplicated when a worker join with old generation assignment

2020-06-10 Thread Yu Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132901#comment-17132901
 ] 

Yu Wang commented on KAFKA-9841:


Thank you for checking [~vvcephei]  and thank you for your help [~kkonstantine].

> Connector and Task duplicated when a worker join with old generation 
> assignment
> ---
>
> Key: KAFKA-9841
> URL: https://issues.apache.org/jira/browse/KAFKA-9841
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.1, 2.4.1
>Reporter: Yu Wang
>Assignee: Yu Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> When using IncrementalCooperativeAssignor.class to assign connectors and 
> tasks.
> Suppose there is a worker 'W' got some connection issue with the coordinator.
> During the connection issue, the connectors/tasks on 'W' are assigned to the 
> others worker
> When the connection issue disappear, 'W' will join the group with an old 
> generation assignment. Then the group leader will get duplicated 
> connectors/tasks in the metadata sent by the workers. But the duplicated 
> connectors/tasks will not be revoked.
>  
> Generation 3:
> Worker1:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker2:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker3:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 
> ributed.DistributedHerder)
> Worker4:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[misc], 
> taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker5:
> [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
> with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
>  
> Generation 4:
> Worker1:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
> leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], 
> taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker2:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
> leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], 
> taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> 

[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8851:
URL: https://github.com/apache/kafka/pull/8851#discussion_r438531045



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -16,82 +16,85 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import org.slf4j.Logger;
 
 import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 
 public abstract class AbstractTask implements Task {
 private Task.State state = CREATED;
-protected Set inputPartitions;
-protected ProcessorTopology topology;
 
 protected final TaskId id;
+protected final ProcessorTopology topology;
 protected final StateDirectory stateDirectory;
 protected final ProcessorStateManager stateMgr;
 
+protected Set inputPartitions;

Review comment:
   Align members to constructor parameter order, and group `final` / 
mutable.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
##
@@ -16,82 +16,85 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
+import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
-import org.slf4j.Logger;
 
 import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED;
 import static org.apache.kafka.streams.processor.internals.Task.State.CREATED;
 
 public abstract class AbstractTask implements Task {
 private Task.State state = CREATED;
-protected Set inputPartitions;
-protected ProcessorTopology topology;
 
 protected final TaskId id;
+protected final ProcessorTopology topology;
 protected final StateDirectory stateDirectory;
 protected final ProcessorStateManager stateMgr;
 
+protected Set inputPartitions;
+
 AbstractTask(final TaskId id,
  final ProcessorTopology topology,
  final StateDirectory stateDirectory,
  final ProcessorStateManager stateMgr,
  final Set inputPartitions) {
 this.id = id;
-this.stateMgr = stateMgr;
 this.topology = topology;
-this.inputPartitions = inputPartitions;
+this.stateMgr = stateMgr;
 this.stateDirectory = stateDirectory;
+this.inputPartitions = inputPartitions;

Review comment:
   Align assignment to parameter order.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure

2020-06-10 Thread Rohan Kulkarni (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132899#comment-17132899
 ] 

Rohan Kulkarni commented on KAFKA-9270:
---

[~mjsax] - Sure. I will track the other issue KAFKA-9274

> KafkaStream crash on offset commit failure
> --
>
> Key: KAFKA-9270
> URL: https://issues.apache.org/jira/browse/KAFKA-9270
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: Rohan Kulkarni
>Priority: Critical
>
> On our Production server we intermittently observe Kafka Streams get crashed 
> with TimeoutException while committing offset. The only workaround seems to 
> be restarting the application which is not a desirable solution for a 
> production environment.
>  
> While have already implemented ProductionExceptionHandler which does not 
> seems to address this.
>  
> Please provide a fix for this or a viable workaround.
>  
> +Application side logs:+
> 2019-11-17 08:28:48.055 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks 
> [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373]
>  - stream-thread 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to 
> commit stream task 0_1 due to the following error:*
>  *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}}
>  
> 2019-11-17 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 
> 08:29:00.891 + 
> [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] -  
>   [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: 
> AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager 
> MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: 
> HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions 
> = [], controller = null) Active tasks: Running: Suspended: Restoring: New: 
> Standby tasks: Running: Suspended: Restoring: New:
>  org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired 
> before successfully committing offsets* 
> \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}}
>  
> +Kafka broker logs:+
> [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f* 
> (org.apache.zookeeper.ClientCnxn)
>  [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from 
> server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
>  
> Regards,
> Rohan



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches

2020-06-10 Thread Randall Hauch (Jira)
Randall Hauch created KAFKA-10146:
-

 Summary: Backport KAFKA-9066 to 2.5 and 2.4 branches
 Key: KAFKA-10146
 URL: https://issues.apache.org/jira/browse/KAFKA-10146
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Randall Hauch
Assignee: Randall Hauch
 Fix For: 2.4.2, 2.5.2


KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so this 
was not backported at the time. However, once 2.5.1 is out the door, the 
`775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` 
branches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax opened a new pull request #8851: MINOR: code cleanup for Kafka Streams task classes

2020-06-10 Thread GitBox


mjsax opened a new pull request #8851:
URL: https://github.com/apache/kafka/pull/8851


   Not functional change. Pure code cleanup
   
   Call for review @vvcephei 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642392428


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642391750


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642391773







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642391327


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642391433


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642391252


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-06-10 Thread GitBox


C0urante commented on pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-642390162


   Fixed the merge conflicts; should be safe to do a new test run now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642388373


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642388044


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642387968


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642387910


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch merged pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter

2020-06-10 Thread GitBox


rhauch merged pull request #8829:
URL: https://github.com/apache/kafka/pull/8829


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10086) Standby state isn't always re-used when transitioning to active

2020-06-10 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10086:
-
Description: 
This ticket was initially just to write an integration test, but I escalated it 
to a blocker and changed the title when the integration test actually surfaced 
two bugs:
 # Offset positions were not reported for in-memory stores, so tasks with 
in-memory stores would never be considered as "caught up" and could not take 
over active processing, preventing clusters from ever achieving balance. This 
is a regression in 2.6
 # When the TaskAssignor decided to switch active processing from a former 
owner to a new one that had a standby, the lower-level cooperative rebalance 
protocol would first de-schedule the task completely, and only later would 
assign it to the new owner. For in-memory stores, this causes the standby state 
not to be re-used, and for persistent stores, it creates a window in which the 
cleanup thread might delete the state directory. In both cases, even though the 
instance previously had a standby, once it gets the active, it still had to 
restore the entire state from the changelog.

  was:
This ticket was initially just to write an integration test, but I escalated it 
to a blocker and changed the title when the integration test actually surfaced 
two bugs:

1. Offset positions were not reported for in-memory stores,


> Standby state isn't always re-used when transitioning to active
> ---
>
> Key: KAFKA-10086
> URL: https://issues.apache.org/jira/browse/KAFKA-10086
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.6.0, 2.7.0
>
>
> This ticket was initially just to write an integration test, but I escalated 
> it to a blocker and changed the title when the integration test actually 
> surfaced two bugs:
>  # Offset positions were not reported for in-memory stores, so tasks with 
> in-memory stores would never be considered as "caught up" and could not take 
> over active processing, preventing clusters from ever achieving balance. This 
> is a regression in 2.6
>  # When the TaskAssignor decided to switch active processing from a former 
> owner to a new one that had a standby, the lower-level cooperative rebalance 
> protocol would first de-schedule the task completely, and only later would 
> assign it to the new owner. For in-memory stores, this causes the standby 
> state not to be re-used, and for persistent stores, it creates a window in 
> which the cleanup thread might delete the state directory. In both cases, 
> even though the instance previously had a standby, once it gets the active, 
> it still had to restore the entire state from the changelog.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vitojeng commented on pull request #8200: KAFKA-5876: IQ should throw different exceptions for different errors(part 1)

2020-06-10 Thread GitBox


vitojeng commented on pull request #8200:
URL: https://github.com/apache/kafka/pull/8200#issuecomment-642387584


   Got it!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9216) Enforce connect internal topic configuration at startup

2020-06-10 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132887#comment-17132887
 ] 

Randall Hauch commented on KAFKA-9216:
--

Thanks, [~ChrisEgerton]. I think we came to consensus on the PR by improving 
the error message with better instructions.

> Enforce connect internal topic configuration at startup
> ---
>
> Key: KAFKA-9216
> URL: https://issues.apache.org/jira/browse/KAFKA-9216
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 0.11.0.0
>Reporter: Randall Hauch
>Assignee: Evelyn Bayes
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> Users sometimes configure Connect's internal topic for configurations with 
> more than one partition. One partition is expected, however, and using more 
> than one leads to weird behavior that is sometimes not easy to spot.
> Here's one example of a log message:
> {noformat}
> "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, 
> groupId=td-connect-server] Current config state offset 284 does not match 
> group assignment 274. Forcing rebalance. 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n"
> {noformat}
> Would it be possible to add a check in the KafkaConfigBackingStore and 
> prevent the worker from starting if connect config partition count !=1 ?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch merged pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

2020-06-10 Thread GitBox


rhauch merged pull request #8828:
URL: https://github.com/apache/kafka/pull/8828


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10086) Cooperative Rebalance causes standby state not to be re-used when transitioning to active

2020-06-10 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10086:
-
Description: 
This ticket was initially just to write an integration test, but I escalated it 
to a blocker and changed the title when the integration test actually surfaced 
two bugs:

1. Offset positions were not reported for in-memory stores,

  was:
This ticket was initially just to write an integration test, but I escalated it 
to a blocker and changed the title when the integration test actually surfaced 
two bugs:

1.


> Cooperative Rebalance causes standby state not to be re-used when 
> transitioning to active
> -
>
> Key: KAFKA-10086
> URL: https://issues.apache.org/jira/browse/KAFKA-10086
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.6.0, 2.7.0
>
>
> This ticket was initially just to write an integration test, but I escalated 
> it to a blocker and changed the title when the integration test actually 
> surfaced two bugs:
> 1. Offset positions were not reported for in-memory stores,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous

2020-06-10 Thread GitBox


kkonstantine commented on pull request #8069:
URL: https://github.com/apache/kafka/pull/8069#issuecomment-642386762


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


ableegoldman commented on a change in pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#discussion_r438524798



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
##
@@ -137,7 +137,7 @@ public void resume() {
 public Map prepareCommit() {
 if (state() == State.RUNNING || state() == State.SUSPENDED) {
 stateMgr.flush();
-log.info("Task ready for committing");
+log.debug("Prepared task for committing");

Review comment:
   Just tried to consolidate the log messages between active/standby tasks 
(and demoted these to debug)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10086) Standby state isn't always re-used when transitioning to active

2020-06-10 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10086:
-
Summary: Standby state isn't always re-used when transitioning to active  
(was: Cooperative Rebalance causes standby state not to be re-used when 
transitioning to active)

> Standby state isn't always re-used when transitioning to active
> ---
>
> Key: KAFKA-10086
> URL: https://issues.apache.org/jira/browse/KAFKA-10086
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.6.0, 2.7.0
>
>
> This ticket was initially just to write an integration test, but I escalated 
> it to a blocker and changed the title when the integration test actually 
> surfaced two bugs:
> 1. Offset positions were not reported for in-memory stores,



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10086) Cooperative Rebalance causes standby state not to be re-used when transitioning to active

2020-06-10 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10086:
-
Summary: Cooperative Rebalance causes standby state not to be re-used when 
transitioning to active  (was: Write Integration Test for 
StreamsTaskAssignor/HighAvailabilityTaskAssignor)

> Cooperative Rebalance causes standby state not to be re-used when 
> transitioning to active
> -
>
> Key: KAFKA-10086
> URL: https://issues.apache.org/jira/browse/KAFKA-10086
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.6.0, 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10086) Cooperative Rebalance causes standby state not to be re-used when transitioning to active

2020-06-10 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10086:
-
Description: 
This ticket was initially just to write an integration test, but I escalated it 
to a blocker and changed the title when the integration test actually surfaced 
two bugs:

1.

> Cooperative Rebalance causes standby state not to be re-used when 
> transitioning to active
> -
>
> Key: KAFKA-10086
> URL: https://issues.apache.org/jira/browse/KAFKA-10086
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.6.0, 2.7.0
>
>
> This ticket was initially just to write an integration test, but I escalated 
> it to a blocker and changed the title when the integration test actually 
> surfaced two bugs:
> 1.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10086) Write Integration Test for StreamsTaskAssignor/HighAvailabilityTaskAssignor

2020-06-10 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-10086:
-
Priority: Blocker  (was: Critical)

> Write Integration Test for StreamsTaskAssignor/HighAvailabilityTaskAssignor
> ---
>
> Key: KAFKA-10086
> URL: https://issues.apache.org/jira/browse/KAFKA-10086
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Blocker
> Fix For: 2.6.0, 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


ableegoldman commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642386255


   Alright, tests/checkstyle/etc are all ready for the builds to be triggered



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] skaundinya15 opened a new pull request #8850: KAFKA-10141: Add more detail to log segment delete messages

2020-06-10 Thread GitBox


skaundinya15 opened a new pull request #8850:
URL: https://github.com/apache/kafka/pull/8850


   As specified in https://issues.apache.org/jira/browse/KAFKA-10141, it would 
be helpful to include as much information as possible when deleting log 
segments. This patch introduces log messages that give more specific details as 
to why the log segment was deleted and the specific metadata regarding that log 
segment.
   
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9066) Kafka Connect JMX : source & sink task metrics missing for tasks in failed state

2020-06-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9066:
-
Fix Version/s: 2.6.0

> Kafka Connect JMX : source & sink task metrics missing for tasks in failed 
> state
> 
>
> Key: KAFKA-9066
> URL: https://issues.apache.org/jira/browse/KAFKA-9066
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.1.1
>Reporter: Mikołaj Stefaniak
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.6.0
>
>
> h2. Overview
> Kafka Connect exposes various metrics via JMX. Those metrics can be exported 
> i.e. by _Prometheus JMX Exporter_ for further processing.
> One of crucial attributes is connector's *task status.*
> According to official Kafka docs, status is available as +status+ attribute 
> of following MBean:
> {quote}kafka.connect:type=connector-task-metrics,connector="\{connector}",task="\{task}"status
>  - The status of the connector task. One of 'unassigned', 'running', 
> 'paused', 'failed', or 'destroyed'.
> {quote}
> h2. Issue
> Generally +connector-task-metrics+ are exposed propery for tasks in +running+ 
> status but not exposed at all if task is +failed+.
> Failed Task *appears* properly with failed status when queried via *REST API*:
>  
> {code:java}
> $ curl -X GET -u 'user:pass' 
> http://kafka-connect.mydomain.com/connectors/customerconnector/status
> {"name":"customerconnector","connector":{"state":"RUNNING","worker_id":"kafka-connect.mydomain.com:8080"},"tasks":[{"id":0,"state":"FAILED","worker_id":"kafka-connect.mydomain.com:8080","trace":"org.apache.kafka.connect.errors.ConnectException:
>  Received DML 'DELETE FROM mysql.rds_sysinfo .."}],"type":"source"}
> $ {code}
>  
> Failed Task *doesn't appear* as bean with +connector-task-metrics+ type when 
> queried via *JMX*:
>  
> {code:java}
> $ echo "beans -d kafka.connect" | java -jar 
> target/jmxterm-1.1.0-SNAPSHOT-uber.jar -l localhost:8081 -n -v silent | grep 
> connector=customerconnector
> kafka.connect:connector=customerconnector,task=0,type=task-error-metricskafka.connect:connector=customerconnector,type=connector-metrics
> $
> {code}
> h2. Expected result
> It is expected, that bean with +connector-task-metrics+ type will appear also 
> for tasks that failed.
> Below is example of how beans are properly registered for tasks in Running 
> state:
>  
> {code:java}
> $ echo "get -b 
> kafka.connect:connector=sinkConsentSubscription-1000,task=0,type=connector-task-metrics
>  status" | java -jar target/jmxterm-1.1.0-SNAPSHOT-ube r.jar -l 
> localhost:8081 -n -v silent
> status = running;
> $
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy

2020-06-10 Thread GitBox


rhauch commented on a change in pull request #8828:
URL: https://github.com/apache/kafka/pull/8828#discussion_r438521713



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
##
@@ -375,6 +383,162 @@ public boolean createTopic(NewTopic topic) {
 return existingTopics;
 }
 
+/**
+ * Verify the named topic uses only compaction for the cleanup policy.
+ *
+ * @param topic the name of the topic
+ * @param workerTopicConfig the name of the worker configuration that 
specifies the topic name
+ * @return true if the admin client could be used to verify the topic 
setting, or false if
+ * the verification could not be performed, likely because the 
admin client principal
+ * did not have the required permissions or because the broker was 
older than 0.11.0.0
+ * @throws ConfigException if the actual topic setting did not match the 
required setting
+ */
+public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String 
workerTopicConfig,
+String topicPurpose) {
+Set cleanupPolicies = topicCleanupPolicy(topic);
+if (cleanupPolicies.isEmpty()) {
+log.debug("Unable to use admin client to verify the cleanup policy 
of '{}' "

Review comment:
   I could see changing this to `info`, because this is important. But the 
others log messages really are just tracking that we're using the admin client 
and what we're finding, so I think `debug` is probably the best there. If the 
cleanup policy is wrong, then if we're logging that we're also going to fail 
the worker; if the cleanup policy is acceptable, I don't think it's worth 
logging it at `info`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9653) Duplicate tasks on workers after rebalance

2020-06-10 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-9653.
---
Resolution: Duplicate

> Duplicate tasks on workers after rebalance
> --
>
> Key: KAFKA-9653
> URL: https://issues.apache.org/jira/browse/KAFKA-9653
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.2
>Reporter: Agam Brahma
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> Verified the following
>  * observed issue goes away when `connect.protocol` is switched from 
> `compatible` to `eager`
>  * Debug logs show `WorkerSourceTask` on two different nodes referencing the 
> same task-id
>  * Debug logs show the node referring to the task as as part of both 
> `Configured assignments` and `Lost assignments`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9653) Duplicate tasks on workers after rebalance

2020-06-10 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9653:
--
Fix Version/s: 2.5.1
   2.4.2
   2.6.0
   2.3.2

> Duplicate tasks on workers after rebalance
> --
>
> Key: KAFKA-9653
> URL: https://issues.apache.org/jira/browse/KAFKA-9653
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.2
>Reporter: Agam Brahma
>Assignee: Konstantine Karantasis
>Priority: Major
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> Verified the following
>  * observed issue goes away when `connect.protocol` is switched from 
> `compatible` to `eager`
>  * Debug logs show `WorkerSourceTask` on two different nodes referencing the 
> same task-id
>  * Debug logs show the node referring to the task as as part of both 
> `Configured assignments` and `Lost assignments`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9653) Duplicate tasks on workers after rebalance

2020-06-10 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132876#comment-17132876
 ] 

Konstantine Karantasis commented on KAFKA-9653:
---

Closing this issue as fixed but also duplicate, given that the known underlying 
issues have now been merged. 

> Duplicate tasks on workers after rebalance
> --
>
> Key: KAFKA-9653
> URL: https://issues.apache.org/jira/browse/KAFKA-9653
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.2
>Reporter: Agam Brahma
>Assignee: Konstantine Karantasis
>Priority: Major
>
> Verified the following
>  * observed issue goes away when `connect.protocol` is switched from 
> `compatible` to `eager`
>  * Debug logs show `WorkerSourceTask` on two different nodes referencing the 
> same task-id
>  * Debug logs show the node referring to the task as as part of both 
> `Configured assignments` and `Lost assignments`



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9841) Connector and Task duplicated when a worker join with old generation assignment

2020-06-10 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132874#comment-17132874
 ] 

Konstantine Karantasis commented on KAFKA-9841:
---

This fix is now merged. Seems it can make {{2.5.1}} 
Thanks for checking [~vvcephei] and thanks for the contribution [~LucentWong]

> Connector and Task duplicated when a worker join with old generation 
> assignment
> ---
>
> Key: KAFKA-9841
> URL: https://issues.apache.org/jira/browse/KAFKA-9841
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.1, 2.4.1
>Reporter: Yu Wang
>Assignee: Yu Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> When using IncrementalCooperativeAssignor.class to assign connectors and 
> tasks.
> Suppose there is a worker 'W' got some connection issue with the coordinator.
> During the connection issue, the connectors/tasks on 'W' are assigned to the 
> others worker
> When the connection issue disappear, 'W' will join the group with an old 
> generation assignment. Then the group leader will get duplicated 
> connectors/tasks in the metadata sent by the workers. But the duplicated 
> connectors/tasks will not be revoked.
>  
> Generation 3:
> Worker1:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker2:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker3:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 
> ributed.DistributedHerder)
> Worker4:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[misc], 
> taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker5:
> [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
> with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
>  
> Generation 4:
> Worker1:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
> leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], 
> taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker2:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
> leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], 
> taskIds=[misc-4], revokedConnectorIds=[], 

[GitHub] [kafka] mjsax commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


mjsax commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642381041


   Btw: checkstyle  failed :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8847: KAFKA-7833: Add missing test

2020-06-10 Thread GitBox


mjsax commented on pull request #8847:
URL: https://github.com/apache/kafka/pull/8847#issuecomment-642380670


   Merged to `trunk` and cherry-picked to `2.6` branch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9845) plugin.path property does not work with config provider

2020-06-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9845:
-
Fix Version/s: 2.5.1
   2.4.2
   2.6.0

> plugin.path property does not work with config provider
> ---
>
> Key: KAFKA-9845
> URL: https://issues.apache.org/jira/browse/KAFKA-9845
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.5.0, 2.4.1
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Minor
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> The config provider mechanism doesn't work if used for the {{plugin.path}} 
> property of a standalone or distributed Connect worker. This is because the 
> {{Plugins}} instance which performs plugin path scanning is created using the 
> raw worker config, pre-transformation (see 
> [ConnectStandalone|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java#L79]
>  and 
> [ConnectDistributed|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java#L91]).
> Unfortunately, because config providers are loaded as plugins, there's a 
> circular dependency issue here. The {{Plugins}} instance needs to be created 
> _before_ the {{DistributedConfig}}/{{StandaloneConfig}} is created in order 
> for the config providers to be loaded correctly, and the config providers 
> need to be loaded in order to perform their logic on any properties 
> (including the {{plugin.path}} property).
> There is no clear fix for this issue in the code base, and the only known 
> workaround is to refrain from using config providers for the {{plugin.path}} 
> property.
> A couple improvements could potentially be made to improve the UX when this 
> issue arises:
>  #  Alter the config logging performed by the {{DistributedConfig}} and 
> {{StandaloneConfig}} classes to _always_ log the raw value for the 
> {{plugin.path}} property. Right now, the transformed value is logged even 
> though it isn't used, which is likely to cause confusion.
>  # Issue a {{WARN}}- or even {{ERROR}}-level log message when it's detected 
> that the user is attempting to use config providers for the {{plugin.path}} 
> property, which states that config providers cannot be used for that specific 
> property, instructs them to change the value for the property accordingly, 
> and/or informs them of the actual value that the framework will use for that 
> property when performing plugin path scanning.
> We should _not_ throw an error on startup if this condition is detected, as 
> this could cause previously-functioning, benignly-misconfigured Connect 
> workers to fail to start after an upgrade.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10145) Enhance to support the multiple join operation

2020-06-10 Thread lqjacklee (Jira)
lqjacklee created KAFKA-10145:
-

 Summary: Enhance to support the multiple join operation
 Key: KAFKA-10145
 URL: https://issues.apache.org/jira/browse/KAFKA-10145
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: lqjacklee


Currently It supports the two stream join, and the join's relationship is 
clear. However in some case the data comes from multiple source/stream, and 
multiple source's relationship is not sure. 

For example :

If we are in the case that the end user will visit the website or click the 
item he(she) interested. Once event occur, The system will post one event to 
Kafka topic. we will calculate the data based on the click stream and the view 
stream. 

1,  Click Event comes from the click stream
2,  View Event comes from the view stream
3, 

finally we just care about the ClickView Aggregation Domain object. 
So once the click event occur , we just update the click event and the 
aggregation object, otherwise view event occur, we can update the view event 
and aggregation. 

The ClickView Aggregation Object will be persistent.  Only the ClickView 
Aggregation Object be updated by the click event and the view event. The 
ClickView Aggregation's method complete() will return true. 





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9985) Sink connector consuming DLQ topic may exhaust broker

2020-06-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9985:
-
Fix Version/s: (was: 2.5.2)
   2.5.1

> Sink connector consuming DLQ topic may exhaust broker
> -
>
> Key: KAFKA-9985
> URL: https://issues.apache.org/jira/browse/KAFKA-9985
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.1, 2.5.0, 2.4.1
>Reporter: Mario Molina
>Assignee: Mario Molina
>Priority: Major
> Fix For: 2.6.0, 2.4.2, 2.5.1
>
>
> When a sink connector is configured with a DLQ and its topic is the same (or 
> matches) as the topic in which the connector reads, the broker and/or 
> connector might be exhausted in case the record send to the topic is invalid.
> Based on the broker/connect config, the connector might fail throwing a 
> RecordTooLargeException previous to exhaust the broker/connector.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9841) Connector and Task duplicated when a worker join with old generation assignment

2020-06-10 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis updated KAFKA-9841:
--
Fix Version/s: 2.4.2
   2.6.0
   2.3.2

> Connector and Task duplicated when a worker join with old generation 
> assignment
> ---
>
> Key: KAFKA-9841
> URL: https://issues.apache.org/jira/browse/KAFKA-9841
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.4.0, 2.3.1, 2.4.1
>Reporter: Yu Wang
>Assignee: Yu Wang
>Priority: Major
>  Labels: pull-request-available
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> When using IncrementalCooperativeAssignor.class to assign connectors and 
> tasks.
> Suppose there is a worker 'W' got some connection issue with the coordinator.
> During the connection issue, the connectors/tasks on 'W' are assigned to the 
> others worker
> When the connection issue disappear, 'W' will join the group with an old 
> generation assignment. Then the group leader will get duplicated 
> connectors/tasks in the metadata sent by the workers. But the duplicated 
> connectors/tasks will not be revoked.
>  
> Generation 3:
> Worker1:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker2:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker3:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 
> ributed.DistributedHerder)
> Worker4:
> [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[misc], 
> taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker5:
> [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', 
> leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], 
> taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} 
> with rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
>  
> Generation 4:
> Worker1:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
> leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], 
> taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
> Worker2:
> [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, 
> groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with 
> protocol version 2 and got assignment: Assignment\{error=0, 
> leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', 
> leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], 
> taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> 

[GitHub] [kafka] rhauch merged pull request #8455: KAFKA-9845: Warn users about using config providers with plugin.path property

2020-06-10 Thread GitBox


rhauch merged pull request #8455:
URL: https://github.com/apache/kafka/pull/8455


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #8847: KAFKA-7833: Add missing test

2020-06-10 Thread GitBox


mjsax merged pull request #8847:
URL: https://github.com/apache/kafka/pull/8847


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8818: KAFKA-10086: Integration test for ensuring warmups are effective

2020-06-10 Thread GitBox


vvcephei commented on a change in pull request #8818:
URL: https://github.com/apache/kafka/pull/8818#discussion_r438517226



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -86,7 +87,7 @@
 private boolean rebalanceInProgress = false;  // if we are in the middle 
of a rebalance, it is not safe to commit
 
 // includes assigned & initialized tasks and unassigned tasks we locked 
temporarily during rebalance
-private Set lockedTaskDirectories = new HashSet<>();
+private final Set lockedTaskDirectories = new HashSet<>();

Review comment:
   Yeah, I thought that was weird when I added the "final" checkstyle rule. 
Oh well...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch merged pull request #8502: KAFKA-9066: Retain metrics for failed tasks

2020-06-10 Thread GitBox


rhauch merged pull request #8502:
URL: https://github.com/apache/kafka/pull/8502


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8200: KAFKA-5876: IQ should throw different exceptions for different errors(part 1)

2020-06-10 Thread GitBox


mjsax commented on pull request #8200:
URL: https://github.com/apache/kafka/pull/8200#issuecomment-642377008


   @vitojeng -- Jenkins got locked down a couple of week ago. Only committer 
can trigger builds now.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8818: KAFKA-10086: Integration test for ensuring warmups are effective

2020-06-10 Thread GitBox


vvcephei commented on pull request #8818:
URL: https://github.com/apache/kafka/pull/8818#issuecomment-642376894


   Thanks for the review, @mjsax ; I've addressed your comments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #8455: KAFKA-9845: Warn users about using config providers with plugin.path property

2020-06-10 Thread GitBox


C0urante commented on pull request #8455:
URL: https://github.com/apache/kafka/pull/8455#issuecomment-642375059


   @rhauch fine by me, applied the suggestions. Thanks for taking a look!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8835: MINOR: reduce sizeInBytes for percentiles metrics

2020-06-10 Thread GitBox


vvcephei commented on pull request #8835:
URL: https://github.com/apache/kafka/pull/8835#issuecomment-642375091


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


ableegoldman commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642374804


   Tests are still a WIP. Just wanted to open it up for input on the problem/fix



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch merged pull request #8848: MINOR: Fix PluginUtilsTest that resulted from a bad backport

2020-06-10 Thread GitBox


rhauch merged pull request #8848:
URL: https://github.com/apache/kafka/pull/8848


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


mjsax commented on a change in pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#discussion_r438514181



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java
##
@@ -92,7 +92,7 @@ public void createTopics() throws Exception {
 }
 
 @Test
-public void surviveWithOneTaskAsStandby() throws ExecutionException, 
InterruptedException, IOException {
+public void surviveWithOneTaskAsStandby() throws InterruptedException, 
IOException {

Review comment:
   nit: just simplify to `throws Exception`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8455: KAFKA-9845: Warn users about using config providers with plugin.path property

2020-06-10 Thread GitBox


rhauch commented on a change in pull request #8455:
URL: https://github.com/apache/kafka/pull/8455#discussion_r438513748



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##
@@ -379,6 +382,22 @@ private void logDeprecatedProperty(String propName, String 
propValue, String def
 }
 }
 
+private void logPluginPathConfigProviderWarning(Map 
rawOriginals) {
+String rawPluginPath = rawOriginals.get(PLUGIN_PATH_CONFIG);
+// Can't use AbstractConfig::originalsStrings here since some values 
may be null, which
+// causes that method to fail
+String transformedPluginPath = 
Objects.toString(originals().get(PLUGIN_PATH_CONFIG));
+if (!Objects.equals(rawPluginPath, transformedPluginPath)) {
+log.warn(
+"Config providers do not work with the plugin.path property. 
The raw value '{}' " 
++ "will be used for plugin scanning, as opposed to the 
transformed value '{}'. " 
++ "See https://issues.apache.org/jira/browse/KAFKA-9845 
for more information.",

Review comment:
   How about:
   ```suggestion
   "Variables cannot be used in the 'plugin.path' property, 
since the property is "
   + "used by plugin scanning before the config providers that 
replace the " 
   + "variables are initialized. The raw value '{}' was used 
for plugin scanning, as " 
   + "opposed to the transformed value '{}', and this may cause 
unexpected results.",
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##
@@ -205,7 +206,9 @@
 + "plugins and their dependencies\n"
 + "Note: symlinks will be followed to discover dependencies or 
plugins.\n"
 + "Examples: 
plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,"
-+ "/opt/connectors";
++ "/opt/connectors\n" 
++ "Warning: Config providers will not take effect if used for the 
value of this " 
++ "property, and instead the raw, non-transformed value will be 
used.";

Review comment:
   How about:
   ```suggestion
   + "/opt/connectors\n" 
   + "Do not use config provider variables in this property, since 
the raw path is used "
   + "by the worker's scanner before config providers are 
initialized and used to "
   + "replace variables.";
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8835: MINOR: reduce sizeInBytes for percentiles metrics

2020-06-10 Thread GitBox


mjsax commented on pull request #8835:
URL: https://github.com/apache/kafka/pull/8835#issuecomment-642373516







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


vvcephei commented on pull request #8849:
URL: https://github.com/apache/kafka/pull/8849#issuecomment-642373459


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on pull request #8848: MINOR: Fix PluginUtilsTest that resulted from a bad backport

2020-06-10 Thread GitBox


kkonstantine commented on pull request #8848:
URL: https://github.com/apache/kafka/pull/8848#issuecomment-642370342


   Thanks @rhauch.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] whsoul commented on pull request #7965: New Kafka Connect SMT for plainText => Struct(or Map)

2020-06-10 Thread GitBox


whsoul commented on pull request #7965:
URL: https://github.com/apache/kafka/pull/7965#issuecomment-642370021


   @rhauch thanks rhauch
   I already create JIRA issue 
(https://issues.apache.org/jira/browse/KAFKA-9436)
   but forgot label "needs-kip"... but now added
   Thanks~



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9436) New Kafka Connect SMT for plainText => Struct(or Map)

2020-06-10 Thread whsoul (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

whsoul updated KAFKA-9436:
--
Labels: needs-kip  (was: )

> New Kafka Connect SMT for plainText => Struct(or Map)
> -
>
> Key: KAFKA-9436
> URL: https://issues.apache.org/jira/browse/KAFKA-9436
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: whsoul
>Priority: Major
>  Labels: needs-kip
>
> I'd like to parse and convert plain text rows to struct(or map) data, and 
> load into documented database such as mongoDB, elasticSearch, etc... with SMT
>  
> For example
>  
> 1. String parse ( with timemillis )
> {code:java}
> {
>"code" : "dev_kafka_pc001_1580372261372"
>,"recode1" : "a"
>,"recode2" : "b" 
> }{code}
> {code:java}
> "transforms": "RegexTransform",
> "transforms.RegexTransform.type": 
> "org.apache.kafka.connect.transforms.ToStructByRegexTransform$Value",
> "transforms.RegexTransform.struct.field": "message",
> "transforms.RegexTransform.regex": 
> "^(.{3,4})_(.*)_(pc|mw|ios|and)([0-9]{3})_([0-9]{13})" 
> "transforms.RegexTransform.mapping": 
> "env,serviceId,device,sequence,datetime:TIMEMILLIS"{code}
>  
>  
> 2. plain text apache log
> {code:java}
> "111.61.73.113 - - [08/Aug/2019:18:15:29 +0900] \"OPTIONS 
> /api/v1/service_config HTTP/1.1\" 200 - 101989 \"http://local.test.com/\; 
> \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, 
> like Gecko) Chrome/75.0.3770.142 Safari/537.36\""
> {code}
> SMT connect config with regular expression below can easily transform a plain 
> text to struct (or map) data.
>  
> {code:java}
> "transforms": "RegexTransform",
> "transforms.RegexTransform.type": 
> "org.apache.kafka.connect.transforms.ToStructByRegexTransform$Value",
> "transforms.RegexTransform.struct.field": "message",
> "transforms.RegexTransform.regex": "^([\\d.]+) (\\S+) (\\S+) 
> \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(GET|POST|OPTIONS|HEAD|PUT|DELETE|PATCH) 
> (.+?) (.+?)\" (\\d{3}) ([0-9|-]+) ([0-9|-]+) \"([^\"]+)\" \"([^\"]+)\""
> "transforms.RegexTransform.mapping": 
> "IP,RemoteUser,AuthedRemoteUser,DateTime,Method,Request,Protocol,Response,BytesSent,Ms:NUMBER,Referrer,UserAgent"
> {code}
>  
> I have PR about this



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch edited a comment on pull request #8502: KAFKA-9066: Retain metrics for failed tasks

2020-06-10 Thread GitBox


rhauch edited a comment on pull request #8502:
URL: https://github.com/apache/kafka/pull/8502#issuecomment-642360742


   @C0urante the original PR description doesn't mention that the task metrics 
for a failed task will also be removed when the connector is deleted (in 
addition to the worker stopping or the tasks completing gracefully). Can you 
confirm my understanding, and if I'm right can you please update the 
description?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit

2020-06-10 Thread GitBox


ableegoldman opened a new pull request #8849:
URL: https://github.com/apache/kafka/pull/8849


   We need to make sure that corrupted standby tasks are actually cleaned up 
upon a TaskCorruptedException. However due to the `commit` prior to invoking 
`handleCorruption`, it's possible to throw a TaskMigratedException before 
actually cleaning up any of the corrupted tasks.
   
   This is fine for active tasks since `handleLostAll` will finish up the job, 
but it does nothing with standby tasks. We should make sure that standby tasks 
are handled before attempting to commit (which we can do, since we don't need 
to commit anything for the corrupted standbys)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10144) Corrupted standby tasks are not always cleaned up

2020-06-10 Thread Sophie Blee-Goldman (Jira)
Sophie Blee-Goldman created KAFKA-10144:
---

 Summary: Corrupted standby tasks are not always cleaned up
 Key: KAFKA-10144
 URL: https://issues.apache.org/jira/browse/KAFKA-10144
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Sophie Blee-Goldman
Assignee: Sophie Blee-Goldman
 Fix For: 2.6.0


Thread death on the 2.6-eos-beta soak was due to re-registration of a standby 
task changelog that was already registered. The root cause was that the task 
had been marked corrupted, but `commit` threw a TaskMigratedException before we 
could get to calling TaskManager#handleCorruption and properly clean up the 
task.

For corrupted active tasks this is not a problem, since #handleLostAll will 
then finish the cleanup. But we intentionally don't clear standbys tasks on 
TaskMigratedException, leaving the task corrupted and partially registered



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rhauch opened a new pull request #8848: MINOR: Fix PluginUtilsTest that resulted from a bad backport

2020-06-10 Thread GitBox


rhauch opened a new pull request #8848:
URL: https://github.com/apache/kafka/pull/8848


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #8502: KAFKA-9066: Retain metrics for failed tasks

2020-06-10 Thread GitBox


C0urante commented on pull request #8502:
URL: https://github.com/apache/kafka/pull/8502#issuecomment-642362050


   @rhauch sure, I can update the description to make it clear that we will 
still remove task metrics when a connector is deleted.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jiameixie commented on pull request #8845: KAFKA-10126:Add a warning message for ConsumerPerformance

2020-06-10 Thread GitBox


jiameixie commented on pull request #8845:
URL: https://github.com/apache/kafka/pull/8845#issuecomment-642361238


   @abbccdda @chia7712 Thanks for your advice. I have updated it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #8502: KAFKA-9066: Retain metrics for failed tasks

2020-06-10 Thread GitBox


rhauch commented on pull request #8502:
URL: https://github.com/apache/kafka/pull/8502#issuecomment-642360742


   @C0urante the original description doesn't mention that the task metrics for 
a failed task will also be removed when the connector is deleted (in addition 
to the worker stopping or the tasks completing gracefully). Can you confirm my 
understanding, and if I'm right can you please update the description?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #8502: KAFKA-9066: Retain metrics for failed tasks

2020-06-10 Thread GitBox


C0urante commented on pull request #8502:
URL: https://github.com/apache/kafka/pull/8502#issuecomment-642357185


   @rhauch it'd be nice to verify this in tests but unfortunately I didn't 
notice your comment earlier in the day and I don't think I have time to 
adjust/write new test cases to account for this. It also looks like the 
possibility you pointed out about hindering re-creation of task metrics isn't 
super likely given that tasks proactively clear out any metrics set up for 
older instances on startup: 
https://github.com/apache/kafka/blob/6abb913c6449113141582921340994c0d5e50839/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L352-L353
   
   If this is still a sticking point, could we log a backlog ticket for those 
testing improvements and address it as time permits after the 2.6 release?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #7496: KAFKA-9018: Throw clearer exceptions on serialisation errors

2020-06-10 Thread GitBox


rhauch commented on a change in pull request #7496:
URL: https://github.com/apache/kafka/pull/7496#discussion_r438497632



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -507,6 +508,18 @@ private SinkRecord convertAndTransformRecord(final 
ConsumerRecord msg, 
boolean isKey) {
+try {
+byte[] value = isKey ? msg.key() : msg.value();
+Converter converter = isKey ? keyConverter : valueConverter;
+return converter.toConnectData(msg.topic(), msg.headers(), value);
+} catch (Exception e) {
+log.error("Error converting message {} in topic '{}' partition {} 
at offset {}",
+isKey ? ConverterType.KEY.getName() : 
ConverterType.VALUE.getName(), msg.topic(), msg.partition(), msg.offset());
+throw e;
+}
+}
+

Review comment:
   Since the calling code already knows whether it's a key or value, how 
about just having separate methods? Yeah, they'd be mostly the same, but we 
could avoid the superfluous logic and could simplify things a bit.
   
   Also, would it be better to wrap the exception rather than just log the 
error? Especially with the retry operator, it's possible that the error won't 
get logged near this log message, so we'd lose the correlation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9067) BigDecimal conversion unnecessarily enforces the scale

2020-06-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9067:
-
Labels: needs-kip  (was: )

> BigDecimal conversion unnecessarily enforces the scale 
> ---
>
> Key: KAFKA-9067
> URL: https://issues.apache.org/jira/browse/KAFKA-9067
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.3.0
>Reporter: Piotr Smolinski
>Priority: Major
>  Labels: needs-kip
>
> In Kafka Connect schema framework it is possible to use fixed point decimal 
> numbers mapped as logical type Decimal. The type is related to Avro defined 
> logical type. When the type is used, the scale value is stored in the schema 
> definition (later it might end in Avro schema) and the unscaled value is 
> stored as integer of unbounded size.
> The problem arises when the decimal value to decode has different scale than 
> the one declared in the schema. During conversion to Avro or JSON using 
> standard converters the operation fails with DataException.
> The proposed solution is to use setScale method to adapt the scale to the 
> correct value and provide rounding mode as parameter to the schema:
> https://docs.oracle.com/javase/8/docs/api/java/math/BigDecimal.html#setScale-int-java.math.RoundingMode-
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9318) Kafka Connect. Add map entry value extraction SMT

2020-06-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-9318:
-
Labels: needs-kip  (was: )

> Kafka Connect. Add map entry value extraction SMT
> -
>
> Key: KAFKA-9318
> URL: https://issues.apache.org/jira/browse/KAFKA-9318
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.4.0
>Reporter: Piotr Smolinski
>Priority: Major
>  Labels: needs-kip
>
> Currently there is ExtractField SMT available that makes it possible to pull 
> specific field from generic JSON or Connect Struct. When Connect Struct with 
> a map field is used this approach does not work when given map entry has to 
> be extracted.
> Example case: JMS source imports message that has entity correlation key 
> inside message property. We want to use the same key for Kafka messages.
> Currently it requires custom SMT coding, while the logic is generic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10115) Incorporate errors.tolerance with the Errant Record Reporter

2020-06-10 Thread Randall Hauch (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch updated KAFKA-10115:
--
Priority: Major  (was: Minor)

> Incorporate errors.tolerance with the Errant Record Reporter
> 
>
> Key: KAFKA-10115
> URL: https://issues.apache.org/jira/browse/KAFKA-10115
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Aakash Shah
>Assignee: Aakash Shah
>Priority: Major
> Fix For: 2.6.0
>
>
> The errors.tolerance config is currently not being used when using the Errant 
> Record Reporter. If errors.tolerance is none then the task should fail after 
> sending it to the DLQ in Kafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10115) Incorporate errors.tolerance with the Errant Record Reporter

2020-06-10 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132823#comment-17132823
 ] 

Randall Hauch commented on KAFKA-10115:
---

This is related to KAFKA-9971.

> Incorporate errors.tolerance with the Errant Record Reporter
> 
>
> Key: KAFKA-10115
> URL: https://issues.apache.org/jira/browse/KAFKA-10115
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.6.0
>Reporter: Aakash Shah
>Assignee: Aakash Shah
>Priority: Major
> Fix For: 2.6.0
>
>
> The errors.tolerance config is currently not being used when using the Errant 
> Record Reporter. If errors.tolerance is none then the task should fail after 
> sending it to the DLQ in Kafka.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9314) Connect put() and poll() retries not conforming to KIP-298

2020-06-10 Thread Randall Hauch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132822#comment-17132822
 ] 

Randall Hauch commented on KAFKA-9314:
--

This seems at least related to 
[KIP-610|https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors].
 [~nigel.liang], would you mind taking a look and considering whether that 
change would handle the request here? If not, then would this potentially need 
a KIP to alter the retry behavior? (If so, we need to add the `needs-kip` 
label.)

> Connect put() and poll() retries not conforming to KIP-298
> --
>
> Key: KAFKA-9314
> URL: https://issues.apache.org/jira/browse/KAFKA-9314
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Nigel Liang
>Assignee: Nigel Liang
>Priority: Major
>
> KIP-298 outlines the retry policy of Connect when errors are encountered. In 
> particular, it proposes to retry on {{RetriableException}} on put() in 
> SinkTask and poll() in SourceTask.
> However, the code does not reflect this change. For instance, 
> WorkerSourceTask handles {{RetriableException}} thrown from {{poll()}} by 
> entering into a tight retry loop without backoff. This has led to connectors 
> having to workaround by simply not retrying and failing the task always. 
> Users would need to manually restart the task to recover from even simple 
> network glitches.
> AFAICT from reading code, the same is true for {{WorkerSinkTask}} when 
> calling {{put()}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs

2020-06-10 Thread GitBox


ableegoldman commented on a change in pull request #8787:
URL: https://github.com/apache/kafka/pull/8787#discussion_r438495057



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##
@@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map clientState
 .flatMap(Collection::stream)
 .collect(Collectors.toList());
 
-final Collection allPreexistingChangelogPartitions 
= new ArrayList<>(allChangelogPartitions);
-allPreexistingChangelogPartitions.removeIf(partition -> 
newlyCreatedChangelogs.contains(partition.topic()));
+final Set preexistingChangelogPartitions = new 
HashSet<>();
+final Set preexistingSourceChangelogPartitions = 
new HashSet<>();
+final Set newlyCreatedChangelogPartitions = new 
HashSet<>();
+for (final TopicPartition changelog : allChangelogPartitions) {
+if (newlyCreatedChangelogs.contains(changelog.topic())) {
+newlyCreatedChangelogPartitions.add(changelog);
+} else if 
(optimizedSourceChangelogs.contains(changelog.topic())) {
+preexistingSourceChangelogPartitions.add(changelog);
+} else {
+preexistingChangelogPartitions.add(changelog);
+}
+}
+
+// Make the listOffsets request first so it can  fetch the offsets 
for non-source changelogs
+// asynchronously while we use the blocking Consumer#committed 
call to fetch source-changelog offsets
+final KafkaFuture> 
endOffsetsFuture =
+fetchEndOffsetsFuture(preexistingChangelogPartitions, 
adminClient);
 
-final Collection 
allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions);
-
allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions);
+final Map sourceChangelogEndOffsets =
+fetchCommittedOffsets(preexistingSourceChangelogPartitions, 
taskManager.mainConsumer());
 
-final Map endOffsets =
-fetchEndOffsets(allPreexistingChangelogPartitions, 
adminClient);
+final Map endOffsets = 
ClientUtils.getEndOffsets(endOffsetsFuture);
 
-allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, 
changelogsByStatefulTask, allNewlyCreatedChangelogPartitions);
+allTaskEndOffsetSums = computeEndOffsetSumsByTask(
+changelogsByStatefulTask,
+endOffsets,
+sourceChangelogEndOffsets,
+newlyCreatedChangelogPartitions);
 fetchEndOffsetsSuccessful = true;
-} catch (final StreamsException e) {
+} catch (final StreamsException | TimeoutException e) {

Review comment:
   > if you throw an exception in the assignor, it just calls the assignor 
again in a tight loop
   
   Wouldn't the leader thread just die? Not saying that that's ideal, either. 
But it's at least in line with how exceptions thrown by other admin client 
requests in the assignment are currently handled.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   3   4   >