[jira] [Updated] (KAFKA-10143) Can no longer change replication throttle with reassignment tool
[ https://issues.apache.org/jira/browse/KAFKA-10143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson updated KAFKA-10143: Description: Previously we could use --execute with the --throttle option in order to change the quota of an active reassignment. We seem to have lost this with KIP-455. The code has the following comment: {code} val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress() if (reassignPartitionsInProgress) { // Note: older versions of this tool would modify the broker quotas here (but not // topic quotas, for some reason). This behavior wasn't documented in the --execute // command line help. Since it might interfere with other ongoing reassignments, // this behavior was dropped as part of the KIP-455 changes. throw new TerseReassignmentFailureException(cannotExecuteBecauseOfExistingMessage) } {code} Seems like it was a mistake to change this because it breaks compatibility. We probably have to revert. At the same time, we can make the intent clearer both in the code and in the command help output. was: Previously we could use --execute with the --throttle option in order to change the quota of an active reassignment. We seem to have lost this with KIP-455. The code has the following comment: {code} val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress() if (reassignPartitionsInProgress) { // Note: older versions of this tool would modify the broker quotas here (but not // topic quotas, for some reason). This behavior wasn't documented in the --execute // command line help. Since it might interfere with other ongoing reassignments, // this behavior was dropped as part of the KIP-455 changes. throw new TerseReassignmentFailureException(cannotExecuteBecauseOfExistingMessage) } {code} Seems like it was a mistake to change this because it breaks compatibility. We probably have to revert. > Can no longer change replication throttle with reassignment tool > > > Key: KAFKA-10143 > URL: https://issues.apache.org/jira/browse/KAFKA-10143 > Project: Kafka > Issue Type: Bug >Reporter: Jason Gustafson >Priority: Blocker > Fix For: 2.6.0 > > > Previously we could use --execute with the --throttle option in order to > change the quota of an active reassignment. We seem to have lost this with > KIP-455. The code has the following comment: > {code} > val reassignPartitionsInProgress = zkClient.reassignPartitionsInProgress() > if (reassignPartitionsInProgress) { > // Note: older versions of this tool would modify the broker quotas > here (but not > // topic quotas, for some reason). This behavior wasn't documented in > the --execute > // command line help. Since it might interfere with other ongoing > reassignments, > // this behavior was dropped as part of the KIP-455 changes. > throw new > TerseReassignmentFailureException(cannotExecuteBecauseOfExistingMessage) > } > {code} > Seems like it was a mistake to change this because it breaks compatibility. > We probably have to revert. At the same time, we can make the intent clearer > both in the code and in the command help output. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] aakashnshah commented on pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter
aakashnshah commented on pull request #8829: URL: https://github.com/apache/kafka/pull/8829#issuecomment-642418469 I've updated the KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors @rhauch @wicknicks This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9216) Enforce connect internal topic configuration at startup
[ https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-9216. -- Reviewer: Randall Hauch Resolution: Fixed Merged to `trunk` the second PR that enforces the `cleanup.policy` topic setting on Connect's three internal topics, and cherry-picked it to the `2.6` (for upcoming 2.6.0). However, merging to earlier branches requires too many changes in integration tests. > Enforce connect internal topic configuration at startup > --- > > Key: KAFKA-9216 > URL: https://issues.apache.org/jira/browse/KAFKA-9216 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Randall Hauch >Assignee: Evelyn Bayes >Priority: Major > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > Users sometimes configure Connect's internal topic for configurations with > more than one partition. One partition is expected, however, and using more > than one leads to weird behavior that is sometimes not easy to spot. > Here's one example of a log message: > {noformat} > "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, > groupId=td-connect-server] Current config state offset 284 does not match > group assignment 274. Forcing rebalance. > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n" > {noformat} > Would it be possible to add a check in the KafkaConfigBackingStore and > prevent the worker from starting if connect config partition count !=1 ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9845) plugin.path property does not work with config provider
[ https://issues.apache.org/jira/browse/KAFKA-9845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-9845. -- Fix Version/s: 2.7.0 Reviewer: Randall Hauch Resolution: Fixed Merged to `trunk`, and backported to `2.6` (for upcoming 2.6.0), `2.5` (for upcoming 2.5.1), and `2.4` (for future 2.4.2). > plugin.path property does not work with config provider > --- > > Key: KAFKA-9845 > URL: https://issues.apache.org/jira/browse/KAFKA-9845 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.5.0, 2.4.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > Fix For: 2.6.0, 2.4.2, 2.5.1, 2.7.0 > > > The config provider mechanism doesn't work if used for the {{plugin.path}} > property of a standalone or distributed Connect worker. This is because the > {{Plugins}} instance which performs plugin path scanning is created using the > raw worker config, pre-transformation (see > [ConnectStandalone|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java#L79] > and > [ConnectDistributed|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java#L91]). > Unfortunately, because config providers are loaded as plugins, there's a > circular dependency issue here. The {{Plugins}} instance needs to be created > _before_ the {{DistributedConfig}}/{{StandaloneConfig}} is created in order > for the config providers to be loaded correctly, and the config providers > need to be loaded in order to perform their logic on any properties > (including the {{plugin.path}} property). > There is no clear fix for this issue in the code base, and the only known > workaround is to refrain from using config providers for the {{plugin.path}} > property. > A couple improvements could potentially be made to improve the UX when this > issue arises: > # Alter the config logging performed by the {{DistributedConfig}} and > {{StandaloneConfig}} classes to _always_ log the raw value for the > {{plugin.path}} property. Right now, the transformed value is logged even > though it isn't used, which is likely to cause confusion. > # Issue a {{WARN}}- or even {{ERROR}}-level log message when it's detected > that the user is attempting to use config providers for the {{plugin.path}} > property, which states that config providers cannot be used for that specific > property, instructs them to change the value for the property accordingly, > and/or informs them of the actual value that the framework will use for that > property when performing plugin path scanning. > We should _not_ throw an error on startup if this condition is detected, as > this could cause previously-functioning, benignly-misconfigured Connect > workers to fail to start after an upgrade. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-6942) Connect connectors api doesn't show versions of connectors
[ https://issues.apache.org/jira/browse/KAFKA-6942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-6942. -- Resolution: Invalid I'm going to close this as INVALID because the versions are available in the API, as noted above. > Connect connectors api doesn't show versions of connectors > -- > > Key: KAFKA-6942 > URL: https://issues.apache.org/jira/browse/KAFKA-6942 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect >Affects Versions: 1.1.0 >Reporter: Antony Stubbs >Priority: Minor > Labels: needs-kip > > Would be very useful to have the connector list API response also return the > version of the installed connectors. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous
kkonstantine commented on pull request #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-642405060 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous
kkonstantine commented on pull request #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-642404708 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous
kkonstantine commented on pull request #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-642404597 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous
C0urante commented on pull request #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-642402730 I believe I've identified the cause of the failure in https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/1048. I've pushed a new commit to fix the logic; it passed tests locally, hopefully Jenkins will be happy with it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-10115) Incorporate errors.tolerance with the Errant Record Reporter
[ https://issues.apache.org/jira/browse/KAFKA-10115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-10115. --- Reviewer: Randall Hauch Resolution: Fixed Merged to `2.6` rather than `trunk` (accidentally) and cherry-picked to `trunk`. > Incorporate errors.tolerance with the Errant Record Reporter > > > Key: KAFKA-10115 > URL: https://issues.apache.org/jira/browse/KAFKA-10115 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.6.0 >Reporter: Aakash Shah >Assignee: Aakash Shah >Priority: Major > Fix For: 2.6.0 > > > The errors.tolerance config is currently not being used when using the Errant > Record Reporter. If errors.tolerance is none then the task should fail after > sending it to the DLQ in Kafka. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9066) Kafka Connect JMX : source & sink task metrics missing for tasks in failed state
[ https://issues.apache.org/jira/browse/KAFKA-9066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-9066. -- Fix Version/s: 2.7.0 Reviewer: Randall Hauch Resolution: Fixed Merged to `trunk`, and backported to `2.6` (for upcoming 2.6.0). I'll file a separate issue to backport this to `2.5` (since we're in-progress on releasing 2.5.1) and `2.4`. > Kafka Connect JMX : source & sink task metrics missing for tasks in failed > state > > > Key: KAFKA-9066 > URL: https://issues.apache.org/jira/browse/KAFKA-9066 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.1.1 >Reporter: Mikołaj Stefaniak >Assignee: Chris Egerton >Priority: Major > Fix For: 2.6.0, 2.7.0 > > > h2. Overview > Kafka Connect exposes various metrics via JMX. Those metrics can be exported > i.e. by _Prometheus JMX Exporter_ for further processing. > One of crucial attributes is connector's *task status.* > According to official Kafka docs, status is available as +status+ attribute > of following MBean: > {quote}kafka.connect:type=connector-task-metrics,connector="\{connector}",task="\{task}"status > - The status of the connector task. One of 'unassigned', 'running', > 'paused', 'failed', or 'destroyed'. > {quote} > h2. Issue > Generally +connector-task-metrics+ are exposed propery for tasks in +running+ > status but not exposed at all if task is +failed+. > Failed Task *appears* properly with failed status when queried via *REST API*: > > {code:java} > $ curl -X GET -u 'user:pass' > http://kafka-connect.mydomain.com/connectors/customerconnector/status > {"name":"customerconnector","connector":{"state":"RUNNING","worker_id":"kafka-connect.mydomain.com:8080"},"tasks":[{"id":0,"state":"FAILED","worker_id":"kafka-connect.mydomain.com:8080","trace":"org.apache.kafka.connect.errors.ConnectException: > Received DML 'DELETE FROM mysql.rds_sysinfo .."}],"type":"source"} > $ {code} > > Failed Task *doesn't appear* as bean with +connector-task-metrics+ type when > queried via *JMX*: > > {code:java} > $ echo "beans -d kafka.connect" | java -jar > target/jmxterm-1.1.0-SNAPSHOT-uber.jar -l localhost:8081 -n -v silent | grep > connector=customerconnector > kafka.connect:connector=customerconnector,task=0,type=task-error-metricskafka.connect:connector=customerconnector,type=connector-metrics > $ > {code} > h2. Expected result > It is expected, that bean with +connector-task-metrics+ type will appear also > for tasks that failed. > Below is example of how beans are properly registered for tasks in Running > state: > > {code:java} > $ echo "get -b > kafka.connect:connector=sinkConsentSubscription-1000,task=0,type=connector-task-metrics > status" | java -jar target/jmxterm-1.1.0-SNAPSHOT-ube r.jar -l > localhost:8081 -n -v silent > status = running; > $ > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vvcephei commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes
vvcephei commented on a change in pull request #8851: URL: https://github.com/apache/kafka/pull/8851#discussion_r438533186 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ## @@ -959,28 +992,23 @@ private void maybeRecordE2ELatency(final long recordTimestamp, final long now, f } } -/** - * Request committing the current task's state - */ -void requestCommit() { -commitRequested = true; +public InternalProcessorContext processorContext() { +return processorContext; } -/** - * Whether or not a request has been made to commit the current state - */ -@Override -public boolean commitRequested() { -return commitRequested; +public boolean hasRecordsQueued() { +return numBuffered() > 0; } +// visible for testing Review comment: Can we avoid these comments? I've come across too many cases where it had become untrue recently, to the point where it just seems pointless to have them. I know you're preserving the comment that was there before while moving the method, but I think we should just clean up the comments as well. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) { */ void completeRestoration(); -void addRecords(TopicPartition partition, Iterable> records); - -boolean commitNeeded(); - -/** - * @throws StreamsException fatal error, should close the thread - */ -Map prepareCommit(); - -void postCommit(); - void suspend(); /** - * * @throws StreamsException fatal error, should close the thread */ void resume(); -/** - * Must be idempotent. - */ +void closeDirty(); + void closeClean(); -/** - * Must be idempotent. - */ -void closeDirty(); + +// non-idempotent life-cycle methods /** - * Updates input partitions and topology after rebalance + * Revive a closed task to a created one; should never throw an exception */ -void update(final Set topicPartitions, final Map> nodeToSourceTopics); +void revive(); /** * Attempt a clean close but do not close the underlying state */ void closeAndRecycleState(); -/** - * Revive a closed task to a created one; should never throw an exception - */ -void revive(); - -StateStore getStore(final String name); - -Set inputPartitions(); +void markChangelogAsCorrupted(final Collection partitions); -/** - * @return any changelog partitions associated with this task - */ -Collection changelogPartitions(); -/** - * @return the offsets of all the changelog partitions associated with this task, - * indicating the current positions of the logged state stores of the task. - */ -Map changelogOffsets(); +// runtime methods (using in RUNNING state) -void markChangelogAsCorrupted(final Collection partitions); +void addRecords(TopicPartition partition, Iterable> records); -default Map purgeableOffsets() { -return Collections.emptyMap(); +default boolean process(final long wallClockTime) { +return false; } default void recordProcessBatchTime(final long processBatchTime) {} default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {} -default boolean process(final long wallClockTime) { +default boolean maybePunctuateStreamTime() { return false; } -default boolean commitRequested() { +default boolean maybePunctuateSystemTime() { return false; } -default boolean maybePunctuateStreamTime() { +boolean commitNeeded(); + +default boolean commitRequested() { return false; } -default boolean maybePunctuateSystemTime() { -return false; +/** + * @throws StreamsException fatal error, should close the thread + */ +Map prepareCommit(); + +void postCommit(); + +default Map purgeableOffsets() { +return Collections.emptyMap(); } + +// task status inquiry + +TaskId id(); + +State state(); + +boolean isActive(); + +/** + * Updates input partitions after a rebalance + */ +void updateInputPartitions(final Set topicPartitions, final Map> nodeToSourceTopics); + +Set inputPartitions(); + +/** + * @return any changelog partitions associated with this task + */ +Collection changelogPartitions(); + + +// IQ related methods Review comment: I appreciate that you've taken the time to organize these methods, and that these comments are an attempt to make sure they stay organized, but I'm afraid that they'll just become misleading over time, the way that the "for
[GitHub] [kafka] mjsax commented on pull request #8851: MINOR: code cleanup for Kafka Streams task classes
mjsax commented on pull request #8851: URL: https://github.com/apache/kafka/pull/8851#issuecomment-642396153 Closing in favor of #8852 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax closed pull request #8851: MINOR: code cleanup for Kafka Streams task classes
mjsax closed pull request #8851: URL: https://github.com/apache/kafka/pull/8851 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes
mjsax commented on a change in pull request #8852: URL: https://github.com/apache/kafka/pull/8852#discussion_r438533555 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) { */ void completeRestoration(); -void addRecords(TopicPartition partition, Iterable> records); - -boolean commitNeeded(); - -/** - * @throws StreamsException fatal error, should close the thread - */ -Map prepareCommit(); - -void postCommit(); - void suspend(); /** - * * @throws StreamsException fatal error, should close the thread */ void resume(); -/** - * Must be idempotent. - */ +void closeDirty(); + void closeClean(); -/** - * Must be idempotent. - */ -void closeDirty(); + +// non-idempotent life-cycle methods /** - * Updates input partitions and topology after rebalance + * Revive a closed task to a created one; should never throw an exception */ -void update(final Set topicPartitions, final Map> nodeToSourceTopics); +void revive(); /** * Attempt a clean close but do not close the underlying state */ void closeAndRecycleState(); -/** - * Revive a closed task to a created one; should never throw an exception - */ -void revive(); - -StateStore getStore(final String name); - -Set inputPartitions(); +void markChangelogAsCorrupted(final Collection partitions); -/** - * @return any changelog partitions associated with this task - */ -Collection changelogPartitions(); -/** - * @return the offsets of all the changelog partitions associated with this task, - * indicating the current positions of the logged state stores of the task. - */ -Map changelogOffsets(); +// runtime methods (using in RUNNING state) -void markChangelogAsCorrupted(final Collection partitions); +void addRecords(TopicPartition partition, Iterable> records); -default Map purgeableOffsets() { -return Collections.emptyMap(); +default boolean process(final long wallClockTime) { +return false; } default void recordProcessBatchTime(final long processBatchTime) {} default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {} -default boolean process(final long wallClockTime) { +default boolean maybePunctuateStreamTime() { return false; } -default boolean commitRequested() { +default boolean maybePunctuateSystemTime() { return false; } -default boolean maybePunctuateStreamTime() { +boolean commitNeeded(); + +default boolean commitRequested() { return false; } -default boolean maybePunctuateSystemTime() { -return false; +/** + * @throws StreamsException fatal error, should close the thread + */ +Map prepareCommit(); + +void postCommit(); + +default Map purgeableOffsets() { +return Collections.emptyMap(); } + +// task status inquiry + +TaskId id(); + +State state(); + +boolean isActive(); + +/** + * Updates input partitions after a rebalance + */ +void updateInputPartitions(final Set topicPartitions, final Map> nodeToSourceTopics); Review comment: Renamed from `update` to `updateInputPartitions` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes
mjsax commented on a change in pull request #8852: URL: https://github.com/apache/kafka/pull/8852#discussion_r438533429 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) { } } -TaskId id(); Review comment: Group and order methods (compare in-line comments). -- Sub-classed inherit this ordering. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) { } } -TaskId id(); -State state(); -boolean isActive(); - -boolean isClosed(); Review comment: This method is unused and removed. That is the only actual change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes
mjsax commented on a change in pull request #8852: URL: https://github.com/apache/kafka/pull/8852#discussion_r438533374 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -57,30 +59,31 @@ * @param stateDirectory the {@link StateDirectory} created by the thread */ StandbyTask(final TaskId id, -final Set partitions, final ProcessorTopology topology, -final StreamsConfig config, -final StreamsMetricsImpl metrics, -final ProcessorStateManager stateMgr, final StateDirectory stateDirectory, +final ProcessorStateManager stateMgr, +final Set partitions, +final StreamsConfig config, +final InternalProcessorContext processorContext, final ThreadCache cache, -final InternalProcessorContext processorContext) { +final StreamsMetricsImpl metrics) { super(id, topology, stateDirectory, stateMgr, partitions); Review comment: Put "super parameter first" in constructor list This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes
mjsax commented on a change in pull request #8852: URL: https://github.com/apache/kafka/pull/8852#discussion_r438533205 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -16,82 +16,85 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.List; -import java.util.Map; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.Set; -import org.slf4j.Logger; import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED; import static org.apache.kafka.streams.processor.internals.Task.State.CREATED; public abstract class AbstractTask implements Task { private Task.State state = CREATED; -protected Set inputPartitions; -protected ProcessorTopology topology; protected final TaskId id; +protected final ProcessorTopology topology; protected final StateDirectory stateDirectory; protected final ProcessorStateManager stateMgr; +protected Set inputPartitions; + AbstractTask(final TaskId id, final ProcessorTopology topology, final StateDirectory stateDirectory, final ProcessorStateManager stateMgr, final Set inputPartitions) { this.id = id; -this.stateMgr = stateMgr; this.topology = topology; -this.inputPartitions = inputPartitions; +this.stateMgr = stateMgr; this.stateDirectory = stateDirectory; +this.inputPartitions = inputPartitions; Review comment: Align assignment to parameter order. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8852: MINOR: code cleanup for Kafka Streams task classes
mjsax commented on a change in pull request #8852: URL: https://github.com/apache/kafka/pull/8852#discussion_r438533174 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -16,82 +16,85 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.List; -import java.util.Map; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.Set; -import org.slf4j.Logger; import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED; import static org.apache.kafka.streams.processor.internals.Task.State.CREATED; public abstract class AbstractTask implements Task { private Task.State state = CREATED; -protected Set inputPartitions; -protected ProcessorTopology topology; protected final TaskId id; +protected final ProcessorTopology topology; protected final StateDirectory stateDirectory; protected final ProcessorStateManager stateMgr; +protected Set inputPartitions; Review comment: Align members to constructor parameter order, and group final / mutable. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax opened a new pull request #8852: MINOR: code cleanup for Kafka Streams task classes
mjsax opened a new pull request #8852: URL: https://github.com/apache/kafka/pull/8852 Not functional change. Pure code cleanup Call for review @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] brary commented on pull request #8200: KAFKA-5876: IQ should throw different exceptions for different errors(part 1)
brary commented on pull request #8200: URL: https://github.com/apache/kafka/pull/8200#issuecomment-642394605 LGTM! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642394105 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes
mjsax commented on a change in pull request #8851: URL: https://github.com/apache/kafka/pull/8851#discussion_r438531936 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -116,89 +112,100 @@ public boolean isValidTransition(final State newState) { */ void completeRestoration(); -void addRecords(TopicPartition partition, Iterable> records); - -boolean commitNeeded(); - -/** - * @throws StreamsException fatal error, should close the thread - */ -Map prepareCommit(); - -void postCommit(); - void suspend(); /** - * * @throws StreamsException fatal error, should close the thread */ void resume(); -/** - * Must be idempotent. - */ +void closeDirty(); + void closeClean(); -/** - * Must be idempotent. - */ -void closeDirty(); + +// non-idempotent life-cycle methods /** - * Updates input partitions and topology after rebalance + * Revive a closed task to a created one; should never throw an exception */ -void update(final Set topicPartitions, final Map> nodeToSourceTopics); +void revive(); /** * Attempt a clean close but do not close the underlying state */ void closeAndRecycleState(); -/** - * Revive a closed task to a created one; should never throw an exception - */ -void revive(); - -StateStore getStore(final String name); - -Set inputPartitions(); +void markChangelogAsCorrupted(final Collection partitions); -/** - * @return any changelog partitions associated with this task - */ -Collection changelogPartitions(); -/** - * @return the offsets of all the changelog partitions associated with this task, - * indicating the current positions of the logged state stores of the task. - */ -Map changelogOffsets(); +// runtime methods (using in RUNNING state) -void markChangelogAsCorrupted(final Collection partitions); +void addRecords(TopicPartition partition, Iterable> records); -default Map purgeableOffsets() { -return Collections.emptyMap(); +default boolean process(final long wallClockTime) { +return false; } default void recordProcessBatchTime(final long processBatchTime) {} default void recordProcessTimeRatioAndBufferSize(final long allTaskProcessMs, final long now) {} -default boolean process(final long wallClockTime) { +default boolean maybePunctuateStreamTime() { return false; } -default boolean commitRequested() { +default boolean maybePunctuateSystemTime() { return false; } -default boolean maybePunctuateStreamTime() { +boolean commitNeeded(); + +default boolean commitRequested() { return false; } -default boolean maybePunctuateSystemTime() { -return false; +/** + * @throws StreamsException fatal error, should close the thread + */ +Map prepareCommit(); + +void postCommit(); + +default Map purgeableOffsets() { +return Collections.emptyMap(); } + +// task status inquiry + +TaskId id(); + +State state(); + +boolean isActive(); + +/** + * Updates input partitions after a rebalance + */ +void updateInputPartitions(final Set topicPartitions, final Map> nodeToSourceTopics); Review comment: Renamed from `update` to `updateInputPartitions` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes
mjsax commented on a change in pull request #8851: URL: https://github.com/apache/kafka/pull/8851#discussion_r438531698 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) { } } -TaskId id(); -State state(); -boolean isActive(); - -boolean isClosed(); Review comment: This method is unused and removed. That is the only actual change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes
mjsax commented on a change in pull request #8851: URL: https://github.com/apache/kafka/pull/8851#discussion_r438531571 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java ## @@ -97,13 +97,9 @@ public boolean isValidTransition(final State newState) { } } -TaskId id(); Review comment: Group and order methods (compare in-line comments). -- Sub-classed inherit this ordering. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] brary commented on a change in pull request #8200: KAFKA-5876: IQ should throw different exceptions for different errors(part 1)
brary commented on a change in pull request #8200: URL: https://github.com/apache/kafka/pull/8200#discussion_r438531599 ## File path: streams/src/main/java/org/apache/kafka/streams/errors/InvalidStateStoreException.java ## @@ -18,13 +18,11 @@ /** - * Indicates that there was a problem when trying to access a - * {@link org.apache.kafka.streams.processor.StateStore StateStore}, i.e, the Store is no longer valid because it is - * closed or doesn't exist any more due to a rebalance. - * - * These exceptions may be transient, i.e., during a rebalance it won't be possible to query the stores as they are - * being (re)-initialized. Once the rebalance has completed the stores will be available again. Hence, it is valid - * to backoff and retry when handling this exception. + * Indicates that there was a problem when trying to access a {@link org.apache.kafka.streams.processor.StateStore StateStore}. + * InvalidStateStoreException not thrown directly but only following sub-classes: + * {@link StreamsNotStartedException}, {@link StreamsRebalancingException}, + * {@link StateStoreMigratedException}, {@link StateStoreNotAvailableException}, + * {@link UnknownStateStoreException}, {@link InvalidStateStorePartitionException} */ public class InvalidStateStoreException extends StreamsException { Review comment: Sure @vitojeng . cc @mjsax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9841) Connector and Task duplicated when a worker join with old generation assignment
[ https://issues.apache.org/jira/browse/KAFKA-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132901#comment-17132901 ] Yu Wang commented on KAFKA-9841: Thank you for checking [~vvcephei] and thank you for your help [~kkonstantine]. > Connector and Task duplicated when a worker join with old generation > assignment > --- > > Key: KAFKA-9841 > URL: https://issues.apache.org/jira/browse/KAFKA-9841 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.3.1, 2.4.1 >Reporter: Yu Wang >Assignee: Yu Wang >Priority: Major > Labels: pull-request-available > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > When using IncrementalCooperativeAssignor.class to assign connectors and > tasks. > Suppose there is a worker 'W' got some connection issue with the coordinator. > During the connection issue, the connectors/tasks on 'W' are assigned to the > others worker > When the connection issue disappear, 'W' will join the group with an old > generation assignment. Then the group leader will get duplicated > connectors/tasks in the metadata sent by the workers. But the duplicated > connectors/tasks will not be revoked. > > Generation 3: > Worker1: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker3: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 > ributed.DistributedHerder) > Worker4: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[misc], > taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker5: > [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > > Generation 4: > Worker1: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 >
[GitHub] [kafka] mjsax commented on a change in pull request #8851: MINOR: code cleanup for Kafka Streams task classes
mjsax commented on a change in pull request #8851: URL: https://github.com/apache/kafka/pull/8851#discussion_r438531045 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -16,82 +16,85 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.List; -import java.util.Map; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.Set; -import org.slf4j.Logger; import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED; import static org.apache.kafka.streams.processor.internals.Task.State.CREATED; public abstract class AbstractTask implements Task { private Task.State state = CREATED; -protected Set inputPartitions; -protected ProcessorTopology topology; protected final TaskId id; +protected final ProcessorTopology topology; protected final StateDirectory stateDirectory; protected final ProcessorStateManager stateMgr; +protected Set inputPartitions; Review comment: Align members to constructor parameter order, and group `final` / mutable. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ## @@ -16,82 +16,85 @@ */ package org.apache.kafka.streams.processor.internals; -import java.util.List; -import java.util.Map; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.slf4j.Logger; import java.util.Collection; +import java.util.List; +import java.util.Map; import java.util.Set; -import org.slf4j.Logger; import static org.apache.kafka.streams.processor.internals.Task.State.CLOSED; import static org.apache.kafka.streams.processor.internals.Task.State.CREATED; public abstract class AbstractTask implements Task { private Task.State state = CREATED; -protected Set inputPartitions; -protected ProcessorTopology topology; protected final TaskId id; +protected final ProcessorTopology topology; protected final StateDirectory stateDirectory; protected final ProcessorStateManager stateMgr; +protected Set inputPartitions; + AbstractTask(final TaskId id, final ProcessorTopology topology, final StateDirectory stateDirectory, final ProcessorStateManager stateMgr, final Set inputPartitions) { this.id = id; -this.stateMgr = stateMgr; this.topology = topology; -this.inputPartitions = inputPartitions; +this.stateMgr = stateMgr; this.stateDirectory = stateDirectory; +this.inputPartitions = inputPartitions; Review comment: Align assignment to parameter order. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9270) KafkaStream crash on offset commit failure
[ https://issues.apache.org/jira/browse/KAFKA-9270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132899#comment-17132899 ] Rohan Kulkarni commented on KAFKA-9270: --- [~mjsax] - Sure. I will track the other issue KAFKA-9274 > KafkaStream crash on offset commit failure > -- > > Key: KAFKA-9270 > URL: https://issues.apache.org/jira/browse/KAFKA-9270 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: Rohan Kulkarni >Priority: Critical > > On our Production server we intermittently observe Kafka Streams get crashed > with TimeoutException while committing offset. The only workaround seems to > be restarting the application which is not a desirable solution for a > production environment. > > While have already implemented ProductionExceptionHandler which does not > seems to address this. > > Please provide a fix for this or a viable workaround. > > +Application side logs:+ > 2019-11-17 08:28:48.055 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > [org.apache.kafka.streams.processor.internals.AssignedTasks:applyToRunningTasks:373] > - stream-thread > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] *Failed to > commit stream task 0_1 due to the following error:* > *org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-1=OffsetAndMetadata{offset=176729402, metadata=''}} > > 2019-11-17 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-12019-11-17 > 08:29:00.891 + > [AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1] [ERROR] - > [:lambda$init$2:130] - Stream crashed!!! StreamsThread threadId: > AggregateJob-614fe688-c9a4-4dad-a881-71488030918b-StreamThread-1TaskManager > MetadataState: GlobalMetadata: [] GlobalStores: [] My HostInfo: > HostInfo\{host='unknown', port=-1} Cluster(id = null, nodes = [], partitions > = [], controller = null) Active tasks: Running: Suspended: Restoring: New: > Standby tasks: Running: Suspended: Restoring: New: > org.apache.kafka.common.errors.*TimeoutException: Timeout of 6ms expired > before successfully committing offsets* > \{AggregateJob-0=OffsetAndMetadata{offset=189808059, metadata=''}} > > +Kafka broker logs:+ > [2019-11-17 13:53:22,774] WARN *Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f* > (org.apache.zookeeper.ClientCnxn) > [2019-11-17 13:53:22,809] INFO Client session timed out, have not heard from > server in 6669ms for sessionid 0x10068e4a2944c2f, closing socket connection > and attempting reconnect (org.apache.zookeeper.ClientCnxn) > > Regards, > Rohan -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10146) Backport KAFKA-9066 to 2.5 and 2.4 branches
Randall Hauch created KAFKA-10146: - Summary: Backport KAFKA-9066 to 2.5 and 2.4 branches Key: KAFKA-10146 URL: https://issues.apache.org/jira/browse/KAFKA-10146 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Randall Hauch Assignee: Randall Hauch Fix For: 2.4.2, 2.5.2 KAFKA-9066 was merged on the same day we were trying to release 2.5.1, so this was not backported at the time. However, once 2.5.1 is out the door, the `775f0d484` commit on `trunk` should be backported to the `2.5` and `2.4` branches. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mjsax opened a new pull request #8851: MINOR: code cleanup for Kafka Streams task classes
mjsax opened a new pull request #8851: URL: https://github.com/apache/kafka/pull/8851 Not functional change. Pure code cleanup Call for review @vvcephei This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642392428 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642391750 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642391773 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642391327 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642391433 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642391252 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous
C0urante commented on pull request #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-642390162 Fixed the merge conflicts; should be safe to do a new test run now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642388373 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642388044 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642387968 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642387910 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch merged pull request #8829: KAFKA-10115: Incorporate errors.tolerance with the Errant Record Reporter
rhauch merged pull request #8829: URL: https://github.com/apache/kafka/pull/8829 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10086) Standby state isn't always re-used when transitioning to active
[ https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10086: - Description: This ticket was initially just to write an integration test, but I escalated it to a blocker and changed the title when the integration test actually surfaced two bugs: # Offset positions were not reported for in-memory stores, so tasks with in-memory stores would never be considered as "caught up" and could not take over active processing, preventing clusters from ever achieving balance. This is a regression in 2.6 # When the TaskAssignor decided to switch active processing from a former owner to a new one that had a standby, the lower-level cooperative rebalance protocol would first de-schedule the task completely, and only later would assign it to the new owner. For in-memory stores, this causes the standby state not to be re-used, and for persistent stores, it creates a window in which the cleanup thread might delete the state directory. In both cases, even though the instance previously had a standby, once it gets the active, it still had to restore the entire state from the changelog. was: This ticket was initially just to write an integration test, but I escalated it to a blocker and changed the title when the integration test actually surfaced two bugs: 1. Offset positions were not reported for in-memory stores, > Standby state isn't always re-used when transitioning to active > --- > > Key: KAFKA-10086 > URL: https://issues.apache.org/jira/browse/KAFKA-10086 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.6.0, 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.6.0, 2.7.0 > > > This ticket was initially just to write an integration test, but I escalated > it to a blocker and changed the title when the integration test actually > surfaced two bugs: > # Offset positions were not reported for in-memory stores, so tasks with > in-memory stores would never be considered as "caught up" and could not take > over active processing, preventing clusters from ever achieving balance. This > is a regression in 2.6 > # When the TaskAssignor decided to switch active processing from a former > owner to a new one that had a standby, the lower-level cooperative rebalance > protocol would first de-schedule the task completely, and only later would > assign it to the new owner. For in-memory stores, this causes the standby > state not to be re-used, and for persistent stores, it creates a window in > which the cleanup thread might delete the state directory. In both cases, > even though the instance previously had a standby, once it gets the active, > it still had to restore the entire state from the changelog. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] vitojeng commented on pull request #8200: KAFKA-5876: IQ should throw different exceptions for different errors(part 1)
vitojeng commented on pull request #8200: URL: https://github.com/apache/kafka/pull/8200#issuecomment-642387584 Got it! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9216) Enforce connect internal topic configuration at startup
[ https://issues.apache.org/jira/browse/KAFKA-9216?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132887#comment-17132887 ] Randall Hauch commented on KAFKA-9216: -- Thanks, [~ChrisEgerton]. I think we came to consensus on the PR by improving the error message with better instructions. > Enforce connect internal topic configuration at startup > --- > > Key: KAFKA-9216 > URL: https://issues.apache.org/jira/browse/KAFKA-9216 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.11.0.0 >Reporter: Randall Hauch >Assignee: Evelyn Bayes >Priority: Major > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > Users sometimes configure Connect's internal topic for configurations with > more than one partition. One partition is expected, however, and using more > than one leads to weird behavior that is sometimes not easy to spot. > Here's one example of a log message: > {noformat} > "textPayload": "[2019-11-20 11:12:14,049] INFO [Worker clientId=connect-1, > groupId=td-connect-server] Current config state offset 284 does not match > group assignment 274. Forcing rebalance. > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:942)\n" > {noformat} > Would it be possible to add a check in the KafkaConfigBackingStore and > prevent the worker from starting if connect config partition count !=1 ? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch merged pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy
rhauch merged pull request #8828: URL: https://github.com/apache/kafka/pull/8828 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10086) Cooperative Rebalance causes standby state not to be re-used when transitioning to active
[ https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10086: - Description: This ticket was initially just to write an integration test, but I escalated it to a blocker and changed the title when the integration test actually surfaced two bugs: 1. Offset positions were not reported for in-memory stores, was: This ticket was initially just to write an integration test, but I escalated it to a blocker and changed the title when the integration test actually surfaced two bugs: 1. > Cooperative Rebalance causes standby state not to be re-used when > transitioning to active > - > > Key: KAFKA-10086 > URL: https://issues.apache.org/jira/browse/KAFKA-10086 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.6.0, 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.6.0, 2.7.0 > > > This ticket was initially just to write an integration test, but I escalated > it to a blocker and changed the title when the integration test actually > surfaced two bugs: > 1. Offset positions were not reported for in-memory stores, -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kkonstantine commented on pull request #8069: KAFKA-9374: Make connector interactions asynchronous
kkonstantine commented on pull request #8069: URL: https://github.com/apache/kafka/pull/8069#issuecomment-642386762 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
ableegoldman commented on a change in pull request #8849: URL: https://github.com/apache/kafka/pull/8849#discussion_r438524798 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java ## @@ -137,7 +137,7 @@ public void resume() { public Map prepareCommit() { if (state() == State.RUNNING || state() == State.SUSPENDED) { stateMgr.flush(); -log.info("Task ready for committing"); +log.debug("Prepared task for committing"); Review comment: Just tried to consolidate the log messages between active/standby tasks (and demoted these to debug) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-10086) Standby state isn't always re-used when transitioning to active
[ https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10086: - Summary: Standby state isn't always re-used when transitioning to active (was: Cooperative Rebalance causes standby state not to be re-used when transitioning to active) > Standby state isn't always re-used when transitioning to active > --- > > Key: KAFKA-10086 > URL: https://issues.apache.org/jira/browse/KAFKA-10086 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.6.0, 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.6.0, 2.7.0 > > > This ticket was initially just to write an integration test, but I escalated > it to a blocker and changed the title when the integration test actually > surfaced two bugs: > 1. Offset positions were not reported for in-memory stores, -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10086) Cooperative Rebalance causes standby state not to be re-used when transitioning to active
[ https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10086: - Summary: Cooperative Rebalance causes standby state not to be re-used when transitioning to active (was: Write Integration Test for StreamsTaskAssignor/HighAvailabilityTaskAssignor) > Cooperative Rebalance causes standby state not to be re-used when > transitioning to active > - > > Key: KAFKA-10086 > URL: https://issues.apache.org/jira/browse/KAFKA-10086 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.6.0, 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.6.0, 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10086) Cooperative Rebalance causes standby state not to be re-used when transitioning to active
[ https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10086: - Description: This ticket was initially just to write an integration test, but I escalated it to a blocker and changed the title when the integration test actually surfaced two bugs: 1. > Cooperative Rebalance causes standby state not to be re-used when > transitioning to active > - > > Key: KAFKA-10086 > URL: https://issues.apache.org/jira/browse/KAFKA-10086 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.6.0, 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.6.0, 2.7.0 > > > This ticket was initially just to write an integration test, but I escalated > it to a blocker and changed the title when the integration test actually > surfaced two bugs: > 1. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10086) Write Integration Test for StreamsTaskAssignor/HighAvailabilityTaskAssignor
[ https://issues.apache.org/jira/browse/KAFKA-10086?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-10086: - Priority: Blocker (was: Critical) > Write Integration Test for StreamsTaskAssignor/HighAvailabilityTaskAssignor > --- > > Key: KAFKA-10086 > URL: https://issues.apache.org/jira/browse/KAFKA-10086 > Project: Kafka > Issue Type: Task > Components: streams >Affects Versions: 2.6.0, 2.7.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Blocker > Fix For: 2.6.0, 2.7.0 > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
ableegoldman commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642386255 Alright, tests/checkstyle/etc are all ready for the builds to be triggered This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] skaundinya15 opened a new pull request #8850: KAFKA-10141: Add more detail to log segment delete messages
skaundinya15 opened a new pull request #8850: URL: https://github.com/apache/kafka/pull/8850 As specified in https://issues.apache.org/jira/browse/KAFKA-10141, it would be helpful to include as much information as possible when deleting log segments. This patch introduces log messages that give more specific details as to why the log segment was deleted and the specific metadata regarding that log segment. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9066) Kafka Connect JMX : source & sink task metrics missing for tasks in failed state
[ https://issues.apache.org/jira/browse/KAFKA-9066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9066: - Fix Version/s: 2.6.0 > Kafka Connect JMX : source & sink task metrics missing for tasks in failed > state > > > Key: KAFKA-9066 > URL: https://issues.apache.org/jira/browse/KAFKA-9066 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.1.1 >Reporter: Mikołaj Stefaniak >Assignee: Chris Egerton >Priority: Major > Fix For: 2.6.0 > > > h2. Overview > Kafka Connect exposes various metrics via JMX. Those metrics can be exported > i.e. by _Prometheus JMX Exporter_ for further processing. > One of crucial attributes is connector's *task status.* > According to official Kafka docs, status is available as +status+ attribute > of following MBean: > {quote}kafka.connect:type=connector-task-metrics,connector="\{connector}",task="\{task}"status > - The status of the connector task. One of 'unassigned', 'running', > 'paused', 'failed', or 'destroyed'. > {quote} > h2. Issue > Generally +connector-task-metrics+ are exposed propery for tasks in +running+ > status but not exposed at all if task is +failed+. > Failed Task *appears* properly with failed status when queried via *REST API*: > > {code:java} > $ curl -X GET -u 'user:pass' > http://kafka-connect.mydomain.com/connectors/customerconnector/status > {"name":"customerconnector","connector":{"state":"RUNNING","worker_id":"kafka-connect.mydomain.com:8080"},"tasks":[{"id":0,"state":"FAILED","worker_id":"kafka-connect.mydomain.com:8080","trace":"org.apache.kafka.connect.errors.ConnectException: > Received DML 'DELETE FROM mysql.rds_sysinfo .."}],"type":"source"} > $ {code} > > Failed Task *doesn't appear* as bean with +connector-task-metrics+ type when > queried via *JMX*: > > {code:java} > $ echo "beans -d kafka.connect" | java -jar > target/jmxterm-1.1.0-SNAPSHOT-uber.jar -l localhost:8081 -n -v silent | grep > connector=customerconnector > kafka.connect:connector=customerconnector,task=0,type=task-error-metricskafka.connect:connector=customerconnector,type=connector-metrics > $ > {code} > h2. Expected result > It is expected, that bean with +connector-task-metrics+ type will appear also > for tasks that failed. > Below is example of how beans are properly registered for tasks in Running > state: > > {code:java} > $ echo "get -b > kafka.connect:connector=sinkConsentSubscription-1000,task=0,type=connector-task-metrics > status" | java -jar target/jmxterm-1.1.0-SNAPSHOT-ube r.jar -l > localhost:8081 -n -v silent > status = running; > $ > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch commented on a change in pull request #8828: KAFKA-9216: Enforce that Connect’s internal topics use `compact` cleanup policy
rhauch commented on a change in pull request #8828: URL: https://github.com/apache/kafka/pull/8828#discussion_r438521713 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java ## @@ -375,6 +383,162 @@ public boolean createTopic(NewTopic topic) { return existingTopics; } +/** + * Verify the named topic uses only compaction for the cleanup policy. + * + * @param topic the name of the topic + * @param workerTopicConfig the name of the worker configuration that specifies the topic name + * @return true if the admin client could be used to verify the topic setting, or false if + * the verification could not be performed, likely because the admin client principal + * did not have the required permissions or because the broker was older than 0.11.0.0 + * @throws ConfigException if the actual topic setting did not match the required setting + */ +public boolean verifyTopicCleanupPolicyOnlyCompact(String topic, String workerTopicConfig, +String topicPurpose) { +Set cleanupPolicies = topicCleanupPolicy(topic); +if (cleanupPolicies.isEmpty()) { +log.debug("Unable to use admin client to verify the cleanup policy of '{}' " Review comment: I could see changing this to `info`, because this is important. But the others log messages really are just tracking that we're using the admin client and what we're finding, so I think `debug` is probably the best there. If the cleanup policy is wrong, then if we're logging that we're also going to fail the worker; if the cleanup policy is acceptable, I don't think it's worth logging it at `info`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-9653) Duplicate tasks on workers after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-9653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis resolved KAFKA-9653. --- Resolution: Duplicate > Duplicate tasks on workers after rebalance > -- > > Key: KAFKA-9653 > URL: https://issues.apache.org/jira/browse/KAFKA-9653 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.3.2 >Reporter: Agam Brahma >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > Verified the following > * observed issue goes away when `connect.protocol` is switched from > `compatible` to `eager` > * Debug logs show `WorkerSourceTask` on two different nodes referencing the > same task-id > * Debug logs show the node referring to the task as as part of both > `Configured assignments` and `Lost assignments` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9653) Duplicate tasks on workers after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-9653?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-9653: -- Fix Version/s: 2.5.1 2.4.2 2.6.0 2.3.2 > Duplicate tasks on workers after rebalance > -- > > Key: KAFKA-9653 > URL: https://issues.apache.org/jira/browse/KAFKA-9653 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.3.2 >Reporter: Agam Brahma >Assignee: Konstantine Karantasis >Priority: Major > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > Verified the following > * observed issue goes away when `connect.protocol` is switched from > `compatible` to `eager` > * Debug logs show `WorkerSourceTask` on two different nodes referencing the > same task-id > * Debug logs show the node referring to the task as as part of both > `Configured assignments` and `Lost assignments` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9653) Duplicate tasks on workers after rebalance
[ https://issues.apache.org/jira/browse/KAFKA-9653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132876#comment-17132876 ] Konstantine Karantasis commented on KAFKA-9653: --- Closing this issue as fixed but also duplicate, given that the known underlying issues have now been merged. > Duplicate tasks on workers after rebalance > -- > > Key: KAFKA-9653 > URL: https://issues.apache.org/jira/browse/KAFKA-9653 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.3.2 >Reporter: Agam Brahma >Assignee: Konstantine Karantasis >Priority: Major > > Verified the following > * observed issue goes away when `connect.protocol` is switched from > `compatible` to `eager` > * Debug logs show `WorkerSourceTask` on two different nodes referencing the > same task-id > * Debug logs show the node referring to the task as as part of both > `Configured assignments` and `Lost assignments` -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9841) Connector and Task duplicated when a worker join with old generation assignment
[ https://issues.apache.org/jira/browse/KAFKA-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132874#comment-17132874 ] Konstantine Karantasis commented on KAFKA-9841: --- This fix is now merged. Seems it can make {{2.5.1}} Thanks for checking [~vvcephei] and thanks for the contribution [~LucentWong] > Connector and Task duplicated when a worker join with old generation > assignment > --- > > Key: KAFKA-9841 > URL: https://issues.apache.org/jira/browse/KAFKA-9841 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.3.1, 2.4.1 >Reporter: Yu Wang >Assignee: Yu Wang >Priority: Major > Labels: pull-request-available > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > When using IncrementalCooperativeAssignor.class to assign connectors and > tasks. > Suppose there is a worker 'W' got some connection issue with the coordinator. > During the connection issue, the connectors/tasks on 'W' are assigned to the > others worker > When the connection issue disappear, 'W' will join the group with an old > generation assignment. Then the group leader will get duplicated > connectors/tasks in the metadata sent by the workers. But the duplicated > connectors/tasks will not be revoked. > > Generation 3: > Worker1: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker3: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 > ributed.DistributedHerder) > Worker4: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[misc], > taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker5: > [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > > Generation 4: > Worker1: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[],
[GitHub] [kafka] mjsax commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
mjsax commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642381041 Btw: checkstyle failed :) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8847: KAFKA-7833: Add missing test
mjsax commented on pull request #8847: URL: https://github.com/apache/kafka/pull/8847#issuecomment-642380670 Merged to `trunk` and cherry-picked to `2.6` branch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9845) plugin.path property does not work with config provider
[ https://issues.apache.org/jira/browse/KAFKA-9845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9845: - Fix Version/s: 2.5.1 2.4.2 2.6.0 > plugin.path property does not work with config provider > --- > > Key: KAFKA-9845 > URL: https://issues.apache.org/jira/browse/KAFKA-9845 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0, 2.4.0, 2.3.1, 2.5.0, 2.4.1 >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Minor > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > The config provider mechanism doesn't work if used for the {{plugin.path}} > property of a standalone or distributed Connect worker. This is because the > {{Plugins}} instance which performs plugin path scanning is created using the > raw worker config, pre-transformation (see > [ConnectStandalone|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java#L79] > and > [ConnectDistributed|https://github.com/apache/kafka/blob/371ad143a6bb973927c89c0788d048a17ebac91a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java#L91]). > Unfortunately, because config providers are loaded as plugins, there's a > circular dependency issue here. The {{Plugins}} instance needs to be created > _before_ the {{DistributedConfig}}/{{StandaloneConfig}} is created in order > for the config providers to be loaded correctly, and the config providers > need to be loaded in order to perform their logic on any properties > (including the {{plugin.path}} property). > There is no clear fix for this issue in the code base, and the only known > workaround is to refrain from using config providers for the {{plugin.path}} > property. > A couple improvements could potentially be made to improve the UX when this > issue arises: > # Alter the config logging performed by the {{DistributedConfig}} and > {{StandaloneConfig}} classes to _always_ log the raw value for the > {{plugin.path}} property. Right now, the transformed value is logged even > though it isn't used, which is likely to cause confusion. > # Issue a {{WARN}}- or even {{ERROR}}-level log message when it's detected > that the user is attempting to use config providers for the {{plugin.path}} > property, which states that config providers cannot be used for that specific > property, instructs them to change the value for the property accordingly, > and/or informs them of the actual value that the framework will use for that > property when performing plugin path scanning. > We should _not_ throw an error on startup if this condition is detected, as > this could cause previously-functioning, benignly-misconfigured Connect > workers to fail to start after an upgrade. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10145) Enhance to support the multiple join operation
lqjacklee created KAFKA-10145: - Summary: Enhance to support the multiple join operation Key: KAFKA-10145 URL: https://issues.apache.org/jira/browse/KAFKA-10145 Project: Kafka Issue Type: Improvement Components: streams Reporter: lqjacklee Currently It supports the two stream join, and the join's relationship is clear. However in some case the data comes from multiple source/stream, and multiple source's relationship is not sure. For example : If we are in the case that the end user will visit the website or click the item he(she) interested. Once event occur, The system will post one event to Kafka topic. we will calculate the data based on the click stream and the view stream. 1, Click Event comes from the click stream 2, View Event comes from the view stream 3, finally we just care about the ClickView Aggregation Domain object. So once the click event occur , we just update the click event and the aggregation object, otherwise view event occur, we can update the view event and aggregation. The ClickView Aggregation Object will be persistent. Only the ClickView Aggregation Object be updated by the click event and the view event. The ClickView Aggregation's method complete() will return true. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9985) Sink connector consuming DLQ topic may exhaust broker
[ https://issues.apache.org/jira/browse/KAFKA-9985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9985: - Fix Version/s: (was: 2.5.2) 2.5.1 > Sink connector consuming DLQ topic may exhaust broker > - > > Key: KAFKA-9985 > URL: https://issues.apache.org/jira/browse/KAFKA-9985 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.3.1, 2.5.0, 2.4.1 >Reporter: Mario Molina >Assignee: Mario Molina >Priority: Major > Fix For: 2.6.0, 2.4.2, 2.5.1 > > > When a sink connector is configured with a DLQ and its topic is the same (or > matches) as the topic in which the connector reads, the broker and/or > connector might be exhausted in case the record send to the topic is invalid. > Based on the broker/connect config, the connector might fail throwing a > RecordTooLargeException previous to exhaust the broker/connector. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9841) Connector and Task duplicated when a worker join with old generation assignment
[ https://issues.apache.org/jira/browse/KAFKA-9841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Konstantine Karantasis updated KAFKA-9841: -- Fix Version/s: 2.4.2 2.6.0 2.3.2 > Connector and Task duplicated when a worker join with old generation > assignment > --- > > Key: KAFKA-9841 > URL: https://issues.apache.org/jira/browse/KAFKA-9841 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.4.0, 2.3.1, 2.4.1 >Reporter: Yu Wang >Assignee: Yu Wang >Priority: Major > Labels: pull-request-available > Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1 > > > When using IncrementalCooperativeAssignor.class to assign connectors and > tasks. > Suppose there is a worker 'W' got some connection issue with the coordinator. > During the connection issue, the connectors/tasks on 'W' are assigned to the > others worker > When the connection issue disappear, 'W' will join the group with an old > generation assignment. Then the group leader will get duplicated > connectors/tasks in the metadata sent by the workers. But the duplicated > connectors/tasks will not be revoked. > > Generation 3: > Worker1: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker3: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-3], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 (org.apache.kafka.connect.runtime.dist 1480 > ributed.DistributedHerder) > Worker4: > [2020-03-17 04:31:23,481] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[misc], > taskIds=[misc-1], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker5: > [2020-03-17 04:31:23,482] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 3 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-ae2a2c31-fe73-4134-a376-4c4af8f466d0', > leaderUrl='http://xx-2:8083/', offset=514, connectorIds=[], > taskIds=[misc-5, misc-2], revokedConnectorIds=[], revokedTaskIds=[], delay=0} > with rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > > Generation 4: > Worker1: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder) > Worker2: > [2020-03-17 04:32:37,165] INFO [Worker clientId=connect-1, > groupId=xx_mm2_fb__connect__group] Joined group at generation 4 with > protocol version 2 and got assignment: Assignment\{error=0, > leader='connect-1-2a332d4a-ef64-4b45-89c4-55f48d58f28c', > leaderUrl='http://xx-4:8083/', offset=515, connectorIds=[], > taskIds=[misc-4], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 >
[GitHub] [kafka] rhauch merged pull request #8455: KAFKA-9845: Warn users about using config providers with plugin.path property
rhauch merged pull request #8455: URL: https://github.com/apache/kafka/pull/8455 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax merged pull request #8847: KAFKA-7833: Add missing test
mjsax merged pull request #8847: URL: https://github.com/apache/kafka/pull/8847 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #8818: KAFKA-10086: Integration test for ensuring warmups are effective
vvcephei commented on a change in pull request #8818: URL: https://github.com/apache/kafka/pull/8818#discussion_r438517226 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ## @@ -86,7 +87,7 @@ private boolean rebalanceInProgress = false; // if we are in the middle of a rebalance, it is not safe to commit // includes assigned & initialized tasks and unassigned tasks we locked temporarily during rebalance -private Set lockedTaskDirectories = new HashSet<>(); +private final Set lockedTaskDirectories = new HashSet<>(); Review comment: Yeah, I thought that was weird when I added the "final" checkstyle rule. Oh well... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch merged pull request #8502: KAFKA-9066: Retain metrics for failed tasks
rhauch merged pull request #8502: URL: https://github.com/apache/kafka/pull/8502 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8200: KAFKA-5876: IQ should throw different exceptions for different errors(part 1)
mjsax commented on pull request #8200: URL: https://github.com/apache/kafka/pull/8200#issuecomment-642377008 @vitojeng -- Jenkins got locked down a couple of week ago. Only committer can trigger builds now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8818: KAFKA-10086: Integration test for ensuring warmups are effective
vvcephei commented on pull request #8818: URL: https://github.com/apache/kafka/pull/8818#issuecomment-642376894 Thanks for the review, @mjsax ; I've addressed your comments. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #8455: KAFKA-9845: Warn users about using config providers with plugin.path property
C0urante commented on pull request #8455: URL: https://github.com/apache/kafka/pull/8455#issuecomment-642375059 @rhauch fine by me, applied the suggestions. Thanks for taking a look! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8835: MINOR: reduce sizeInBytes for percentiles metrics
vvcephei commented on pull request #8835: URL: https://github.com/apache/kafka/pull/8835#issuecomment-642375091 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
ableegoldman commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642374804 Tests are still a WIP. Just wanted to open it up for input on the problem/fix This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch merged pull request #8848: MINOR: Fix PluginUtilsTest that resulted from a bad backport
rhauch merged pull request #8848: URL: https://github.com/apache/kafka/pull/8848 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
mjsax commented on a change in pull request #8849: URL: https://github.com/apache/kafka/pull/8849#discussion_r438514181 ## File path: streams/src/test/java/org/apache/kafka/streams/integration/StandbyTaskEOSIntegrationTest.java ## @@ -92,7 +92,7 @@ public void createTopics() throws Exception { } @Test -public void surviveWithOneTaskAsStandby() throws ExecutionException, InterruptedException, IOException { +public void surviveWithOneTaskAsStandby() throws InterruptedException, IOException { Review comment: nit: just simplify to `throws Exception` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #8455: KAFKA-9845: Warn users about using config providers with plugin.path property
rhauch commented on a change in pull request #8455: URL: https://github.com/apache/kafka/pull/8455#discussion_r438513748 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ## @@ -379,6 +382,22 @@ private void logDeprecatedProperty(String propName, String propValue, String def } } +private void logPluginPathConfigProviderWarning(Map rawOriginals) { +String rawPluginPath = rawOriginals.get(PLUGIN_PATH_CONFIG); +// Can't use AbstractConfig::originalsStrings here since some values may be null, which +// causes that method to fail +String transformedPluginPath = Objects.toString(originals().get(PLUGIN_PATH_CONFIG)); +if (!Objects.equals(rawPluginPath, transformedPluginPath)) { +log.warn( +"Config providers do not work with the plugin.path property. The raw value '{}' " ++ "will be used for plugin scanning, as opposed to the transformed value '{}'. " ++ "See https://issues.apache.org/jira/browse/KAFKA-9845 for more information.", Review comment: How about: ```suggestion "Variables cannot be used in the 'plugin.path' property, since the property is " + "used by plugin scanning before the config providers that replace the " + "variables are initialized. The raw value '{}' was used for plugin scanning, as " + "opposed to the transformed value '{}', and this may cause unexpected results.", ``` ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java ## @@ -205,7 +206,9 @@ + "plugins and their dependencies\n" + "Note: symlinks will be followed to discover dependencies or plugins.\n" + "Examples: plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins," -+ "/opt/connectors"; ++ "/opt/connectors\n" ++ "Warning: Config providers will not take effect if used for the value of this " ++ "property, and instead the raw, non-transformed value will be used."; Review comment: How about: ```suggestion + "/opt/connectors\n" + "Do not use config provider variables in this property, since the raw path is used " + "by the worker's scanner before config providers are initialized and used to " + "replace variables."; ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #8835: MINOR: reduce sizeInBytes for percentiles metrics
mjsax commented on pull request #8835: URL: https://github.com/apache/kafka/pull/8835#issuecomment-642373516 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
vvcephei commented on pull request #8849: URL: https://github.com/apache/kafka/pull/8849#issuecomment-642373459 Test this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on pull request #8848: MINOR: Fix PluginUtilsTest that resulted from a bad backport
kkonstantine commented on pull request #8848: URL: https://github.com/apache/kafka/pull/8848#issuecomment-642370342 Thanks @rhauch. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] whsoul commented on pull request #7965: New Kafka Connect SMT for plainText => Struct(or Map)
whsoul commented on pull request #7965: URL: https://github.com/apache/kafka/pull/7965#issuecomment-642370021 @rhauch thanks rhauch I already create JIRA issue (https://issues.apache.org/jira/browse/KAFKA-9436) but forgot label "needs-kip"... but now added Thanks~ This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9436) New Kafka Connect SMT for plainText => Struct(or Map)
[ https://issues.apache.org/jira/browse/KAFKA-9436?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] whsoul updated KAFKA-9436: -- Labels: needs-kip (was: ) > New Kafka Connect SMT for plainText => Struct(or Map) > - > > Key: KAFKA-9436 > URL: https://issues.apache.org/jira/browse/KAFKA-9436 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: whsoul >Priority: Major > Labels: needs-kip > > I'd like to parse and convert plain text rows to struct(or map) data, and > load into documented database such as mongoDB, elasticSearch, etc... with SMT > > For example > > 1. String parse ( with timemillis ) > {code:java} > { >"code" : "dev_kafka_pc001_1580372261372" >,"recode1" : "a" >,"recode2" : "b" > }{code} > {code:java} > "transforms": "RegexTransform", > "transforms.RegexTransform.type": > "org.apache.kafka.connect.transforms.ToStructByRegexTransform$Value", > "transforms.RegexTransform.struct.field": "message", > "transforms.RegexTransform.regex": > "^(.{3,4})_(.*)_(pc|mw|ios|and)([0-9]{3})_([0-9]{13})" > "transforms.RegexTransform.mapping": > "env,serviceId,device,sequence,datetime:TIMEMILLIS"{code} > > > 2. plain text apache log > {code:java} > "111.61.73.113 - - [08/Aug/2019:18:15:29 +0900] \"OPTIONS > /api/v1/service_config HTTP/1.1\" 200 - 101989 \"http://local.test.com/\; > \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_5) AppleWebKit/537.36 (KHTML, > like Gecko) Chrome/75.0.3770.142 Safari/537.36\"" > {code} > SMT connect config with regular expression below can easily transform a plain > text to struct (or map) data. > > {code:java} > "transforms": "RegexTransform", > "transforms.RegexTransform.type": > "org.apache.kafka.connect.transforms.ToStructByRegexTransform$Value", > "transforms.RegexTransform.struct.field": "message", > "transforms.RegexTransform.regex": "^([\\d.]+) (\\S+) (\\S+) > \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(GET|POST|OPTIONS|HEAD|PUT|DELETE|PATCH) > (.+?) (.+?)\" (\\d{3}) ([0-9|-]+) ([0-9|-]+) \"([^\"]+)\" \"([^\"]+)\"" > "transforms.RegexTransform.mapping": > "IP,RemoteUser,AuthedRemoteUser,DateTime,Method,Request,Protocol,Response,BytesSent,Ms:NUMBER,Referrer,UserAgent" > {code} > > I have PR about this -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch edited a comment on pull request #8502: KAFKA-9066: Retain metrics for failed tasks
rhauch edited a comment on pull request #8502: URL: https://github.com/apache/kafka/pull/8502#issuecomment-642360742 @C0urante the original PR description doesn't mention that the task metrics for a failed task will also be removed when the connector is deleted (in addition to the worker stopping or the tasks completing gracefully). Can you confirm my understanding, and if I'm right can you please update the description? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #8849: KAFKA-10144: clean up corrupted standby tasks before attempting a commit
ableegoldman opened a new pull request #8849: URL: https://github.com/apache/kafka/pull/8849 We need to make sure that corrupted standby tasks are actually cleaned up upon a TaskCorruptedException. However due to the `commit` prior to invoking `handleCorruption`, it's possible to throw a TaskMigratedException before actually cleaning up any of the corrupted tasks. This is fine for active tasks since `handleLostAll` will finish up the job, but it does nothing with standby tasks. We should make sure that standby tasks are handled before attempting to commit (which we can do, since we don't need to commit anything for the corrupted standbys) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-10144) Corrupted standby tasks are not always cleaned up
Sophie Blee-Goldman created KAFKA-10144: --- Summary: Corrupted standby tasks are not always cleaned up Key: KAFKA-10144 URL: https://issues.apache.org/jira/browse/KAFKA-10144 Project: Kafka Issue Type: Bug Components: streams Reporter: Sophie Blee-Goldman Assignee: Sophie Blee-Goldman Fix For: 2.6.0 Thread death on the 2.6-eos-beta soak was due to re-registration of a standby task changelog that was already registered. The root cause was that the task had been marked corrupted, but `commit` threw a TaskMigratedException before we could get to calling TaskManager#handleCorruption and properly clean up the task. For corrupted active tasks this is not a problem, since #handleLostAll will then finish the cleanup. But we intentionally don't clear standbys tasks on TaskMigratedException, leaving the task corrupted and partially registered -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rhauch opened a new pull request #8848: MINOR: Fix PluginUtilsTest that resulted from a bad backport
rhauch opened a new pull request #8848: URL: https://github.com/apache/kafka/pull/8848 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #8502: KAFKA-9066: Retain metrics for failed tasks
C0urante commented on pull request #8502: URL: https://github.com/apache/kafka/pull/8502#issuecomment-642362050 @rhauch sure, I can update the description to make it clear that we will still remove task metrics when a connector is deleted. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jiameixie commented on pull request #8845: KAFKA-10126:Add a warning message for ConsumerPerformance
jiameixie commented on pull request #8845: URL: https://github.com/apache/kafka/pull/8845#issuecomment-642361238 @abbccdda @chia7712 Thanks for your advice. I have updated it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #8502: KAFKA-9066: Retain metrics for failed tasks
rhauch commented on pull request #8502: URL: https://github.com/apache/kafka/pull/8502#issuecomment-642360742 @C0urante the original description doesn't mention that the task metrics for a failed task will also be removed when the connector is deleted (in addition to the worker stopping or the tasks completing gracefully). Can you confirm my understanding, and if I'm right can you please update the description? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] C0urante commented on pull request #8502: KAFKA-9066: Retain metrics for failed tasks
C0urante commented on pull request #8502: URL: https://github.com/apache/kafka/pull/8502#issuecomment-642357185 @rhauch it'd be nice to verify this in tests but unfortunately I didn't notice your comment earlier in the day and I don't think I have time to adjust/write new test cases to account for this. It also looks like the possibility you pointed out about hindering re-creation of task metrics isn't super likely given that tasks proactively clear out any metrics set up for older instances on startup: https://github.com/apache/kafka/blob/6abb913c6449113141582921340994c0d5e50839/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java#L352-L353 If this is still a sticking point, could we log a backlog ticket for those testing improvements and address it as time permits after the 2.6 release? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on a change in pull request #7496: KAFKA-9018: Throw clearer exceptions on serialisation errors
rhauch commented on a change in pull request #7496: URL: https://github.com/apache/kafka/pull/7496#discussion_r438497632 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java ## @@ -507,6 +508,18 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord msg, boolean isKey) { +try { +byte[] value = isKey ? msg.key() : msg.value(); +Converter converter = isKey ? keyConverter : valueConverter; +return converter.toConnectData(msg.topic(), msg.headers(), value); +} catch (Exception e) { +log.error("Error converting message {} in topic '{}' partition {} at offset {}", +isKey ? ConverterType.KEY.getName() : ConverterType.VALUE.getName(), msg.topic(), msg.partition(), msg.offset()); +throw e; +} +} + Review comment: Since the calling code already knows whether it's a key or value, how about just having separate methods? Yeah, they'd be mostly the same, but we could avoid the superfluous logic and could simplify things a bit. Also, would it be better to wrap the exception rather than just log the error? Especially with the retry operator, it's possible that the error won't get logged near this log message, so we'd lose the correlation. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (KAFKA-9067) BigDecimal conversion unnecessarily enforces the scale
[ https://issues.apache.org/jira/browse/KAFKA-9067?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9067: - Labels: needs-kip (was: ) > BigDecimal conversion unnecessarily enforces the scale > --- > > Key: KAFKA-9067 > URL: https://issues.apache.org/jira/browse/KAFKA-9067 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.3.0 >Reporter: Piotr Smolinski >Priority: Major > Labels: needs-kip > > In Kafka Connect schema framework it is possible to use fixed point decimal > numbers mapped as logical type Decimal. The type is related to Avro defined > logical type. When the type is used, the scale value is stored in the schema > definition (later it might end in Avro schema) and the unscaled value is > stored as integer of unbounded size. > The problem arises when the decimal value to decode has different scale than > the one declared in the schema. During conversion to Avro or JSON using > standard converters the operation fails with DataException. > The proposed solution is to use setScale method to adapt the scale to the > correct value and provide rounding mode as parameter to the schema: > https://docs.oracle.com/javase/8/docs/api/java/math/BigDecimal.html#setScale-int-java.math.RoundingMode- > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-9318) Kafka Connect. Add map entry value extraction SMT
[ https://issues.apache.org/jira/browse/KAFKA-9318?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-9318: - Labels: needs-kip (was: ) > Kafka Connect. Add map entry value extraction SMT > - > > Key: KAFKA-9318 > URL: https://issues.apache.org/jira/browse/KAFKA-9318 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.4.0 >Reporter: Piotr Smolinski >Priority: Major > Labels: needs-kip > > Currently there is ExtractField SMT available that makes it possible to pull > specific field from generic JSON or Connect Struct. When Connect Struct with > a map field is used this approach does not work when given map entry has to > be extracted. > Example case: JMS source imports message that has entity correlation key > inside message property. We want to use the same key for Kafka messages. > Currently it requires custom SMT coding, while the logic is generic. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10115) Incorporate errors.tolerance with the Errant Record Reporter
[ https://issues.apache.org/jira/browse/KAFKA-10115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch updated KAFKA-10115: -- Priority: Major (was: Minor) > Incorporate errors.tolerance with the Errant Record Reporter > > > Key: KAFKA-10115 > URL: https://issues.apache.org/jira/browse/KAFKA-10115 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.6.0 >Reporter: Aakash Shah >Assignee: Aakash Shah >Priority: Major > Fix For: 2.6.0 > > > The errors.tolerance config is currently not being used when using the Errant > Record Reporter. If errors.tolerance is none then the task should fail after > sending it to the DLQ in Kafka. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10115) Incorporate errors.tolerance with the Errant Record Reporter
[ https://issues.apache.org/jira/browse/KAFKA-10115?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132823#comment-17132823 ] Randall Hauch commented on KAFKA-10115: --- This is related to KAFKA-9971. > Incorporate errors.tolerance with the Errant Record Reporter > > > Key: KAFKA-10115 > URL: https://issues.apache.org/jira/browse/KAFKA-10115 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 2.6.0 >Reporter: Aakash Shah >Assignee: Aakash Shah >Priority: Major > Fix For: 2.6.0 > > > The errors.tolerance config is currently not being used when using the Errant > Record Reporter. If errors.tolerance is none then the task should fail after > sending it to the DLQ in Kafka. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9314) Connect put() and poll() retries not conforming to KIP-298
[ https://issues.apache.org/jira/browse/KAFKA-9314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17132822#comment-17132822 ] Randall Hauch commented on KAFKA-9314: -- This seems at least related to [KIP-610|https://cwiki.apache.org/confluence/display/KAFKA/KIP-610%3A+Error+Reporting+in+Sink+Connectors]. [~nigel.liang], would you mind taking a look and considering whether that change would handle the request here? If not, then would this potentially need a KIP to alter the retry behavior? (If so, we need to add the `needs-kip` label.) > Connect put() and poll() retries not conforming to KIP-298 > -- > > Key: KAFKA-9314 > URL: https://issues.apache.org/jira/browse/KAFKA-9314 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Nigel Liang >Assignee: Nigel Liang >Priority: Major > > KIP-298 outlines the retry policy of Connect when errors are encountered. In > particular, it proposes to retry on {{RetriableException}} on put() in > SinkTask and poll() in SourceTask. > However, the code does not reflect this change. For instance, > WorkerSourceTask handles {{RetriableException}} thrown from {{poll()}} by > entering into a tight retry loop without backoff. This has led to connectors > having to workaround by simply not retrying and failing the task always. > Users would need to manually restart the task to recover from even simple > network glitches. > AFAICT from reading code, the same is true for {{WorkerSinkTask}} when > calling {{put()}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #8787: KAFKA-10085: correctly compute lag for optimized source changelogs
ableegoldman commented on a change in pull request #8787: URL: https://github.com/apache/kafka/pull/8787#discussion_r438495057 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ## @@ -763,18 +778,36 @@ private boolean populateClientStatesMap(final Map clientState .flatMap(Collection::stream) .collect(Collectors.toList()); -final Collection allPreexistingChangelogPartitions = new ArrayList<>(allChangelogPartitions); -allPreexistingChangelogPartitions.removeIf(partition -> newlyCreatedChangelogs.contains(partition.topic())); +final Set preexistingChangelogPartitions = new HashSet<>(); +final Set preexistingSourceChangelogPartitions = new HashSet<>(); +final Set newlyCreatedChangelogPartitions = new HashSet<>(); +for (final TopicPartition changelog : allChangelogPartitions) { +if (newlyCreatedChangelogs.contains(changelog.topic())) { +newlyCreatedChangelogPartitions.add(changelog); +} else if (optimizedSourceChangelogs.contains(changelog.topic())) { +preexistingSourceChangelogPartitions.add(changelog); +} else { +preexistingChangelogPartitions.add(changelog); +} +} + +// Make the listOffsets request first so it can fetch the offsets for non-source changelogs +// asynchronously while we use the blocking Consumer#committed call to fetch source-changelog offsets +final KafkaFuture> endOffsetsFuture = +fetchEndOffsetsFuture(preexistingChangelogPartitions, adminClient); -final Collection allNewlyCreatedChangelogPartitions = new ArrayList<>(allChangelogPartitions); - allNewlyCreatedChangelogPartitions.removeAll(allPreexistingChangelogPartitions); +final Map sourceChangelogEndOffsets = +fetchCommittedOffsets(preexistingSourceChangelogPartitions, taskManager.mainConsumer()); -final Map endOffsets = -fetchEndOffsets(allPreexistingChangelogPartitions, adminClient); +final Map endOffsets = ClientUtils.getEndOffsets(endOffsetsFuture); -allTaskEndOffsetSums = computeEndOffsetSumsByTask(endOffsets, changelogsByStatefulTask, allNewlyCreatedChangelogPartitions); +allTaskEndOffsetSums = computeEndOffsetSumsByTask( +changelogsByStatefulTask, +endOffsets, +sourceChangelogEndOffsets, +newlyCreatedChangelogPartitions); fetchEndOffsetsSuccessful = true; -} catch (final StreamsException e) { +} catch (final StreamsException | TimeoutException e) { Review comment: > if you throw an exception in the assignor, it just calls the assignor again in a tight loop Wouldn't the leader thread just die? Not saying that that's ideal, either. But it's at least in line with how exceptions thrown by other admin client requests in the assignment are currently handled. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org