[jira] [Created] (KAFKA-17534) Allow disabling hearbeats topic replication in MirrorSourceConnector
Daniel Urban created KAFKA-17534: Summary: Allow disabling hearbeats topic replication in MirrorSourceConnector Key: KAFKA-17534 URL: https://issues.apache.org/jira/browse/KAFKA-17534 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban Assignee: Daniel Urban Currently, MirrorSourceConnector always replicates the heartbeats topics. In some use-cases (e.g. multiple parallel, mutually exclusive MirrorSourceConnector instances in the same replication), users need to disable this behavior. Add a new property to allow disabling the heartbeats replication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently
Daniel Urban created KAFKA-15372: Summary: MM2 rolling restart can drop configuration changes silently Key: KAFKA-15372 URL: https://issues.apache.org/jira/browse/KAFKA-15372 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban When MM2 is restarted, it tries to update the Connector configuration in all flows. This is a one-time trial, and fails if the Connect worker is not the leader of the group. In a distributed setup and with a rolling restart, it is possible that for a specific flow, the Connect worker of the just restarted MM2 instance is not the leader, meaning that Connector configurations can get dropped. For example, assuming 2 MM2 instances, and one flow A->B: # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the leader of A->B Connect group. # MM2 instance 1 tries to update the Connector configurations, but fails (instance 2 has the leader, not instance 1) # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1 # MM2 instance 2 tries to update the Connector configurations, but fails At this point, the configuration changes before the restart are never applied. Many times, this can also happen silently, without any indication. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-15075) MM2 internal checkpoints topic should support multiple partitions
Daniel Urban created KAFKA-15075: Summary: MM2 internal checkpoints topic should support multiple partitions Key: KAFKA-15075 URL: https://issues.apache.org/jira/browse/KAFKA-15075 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban Currently, the internal checkpoints topic of MM2 uses a single partition. This is an unnecessary limitation, and instead, it should support more partitions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14978) ExactlyOnceWorkerSourceTask does not remove parent metrics
Daniel Urban created KAFKA-14978: Summary: ExactlyOnceWorkerSourceTask does not remove parent metrics Key: KAFKA-14978 URL: https://issues.apache.org/jira/browse/KAFKA-14978 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Daniel Urban Assignee: Daniel Urban ExactlyOnceWorkerSourceTask removeMetrics does not invoke super.removeMetrics, meaning that only the transactional metrics are removed, and common source task metrics are not. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14903) MM2 Topic And Group Listener (KIP-918)
Daniel Urban created KAFKA-14903: Summary: MM2 Topic And Group Listener (KIP-918) Key: KAFKA-14903 URL: https://issues.apache.org/jira/browse/KAFKA-14903 Project: Kafka Issue Type: New Feature Reporter: Daniel Urban Assignee: Daniel Urban MM2 has a dynamic topic and group filter mechanism, in which the replicated topics/groups can dynamically change, either due to changes in the available topics/groups, or changes in the filter settings. In order to monitor the currently replicated topics/groups, MM2 should support a TopicListener and GroupListener plugin, which is triggered when MM2 changes the set of replicated topics/groups. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14902) KafkaBasedLog infinite retries can lead to StackOverflowError
Daniel Urban created KAFKA-14902: Summary: KafkaBasedLog infinite retries can lead to StackOverflowError Key: KAFKA-14902 URL: https://issues.apache.org/jira/browse/KAFKA-14902 Project: Kafka Issue Type: Bug Components: KafkaConnect Reporter: Daniel Urban Assignee: Daniel Urban KafkaBasedLog subclasses use an infinite retry on producer sends, using a callback. Sometimes, when specific errors are encountered, the callback is invoked in the send call, on the calling thread. If this happens enough times, a stack overflow happens. Example stacktrace from 2.5 (but the newest code can also encounter the same): {code:java} 2023-01-14 12:48:23,487 ERROR org.apache.kafka.connect.runtime.WorkerTask: WorkerSourceTask{id=MirrorSourceConnector-1} Task threw an uncaught and unrecoverable exception java.lang.StackOverflowError: null at org.apache.kafka.common.metrics.stats.SampledStat.record(SampledStat.java:50) at org.apache.kafka.common.metrics.stats.Rate.record(Rate.java:60) at org.apache.kafka.common.metrics.stats.Meter.record(Meter.java:80) at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:188) at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:178) at org.apache.kafka.clients.producer.internals.BufferPool.recordWaitTime(BufferPool.java:202) at org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:147) at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:221) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:941) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862) at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:238) at org.apache.kafka.connect.storage.KafkaStatusBackingStore$4.onCompletion(KafkaStatusBackingStore.java:298) ... at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:959) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862) at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:238) at org.apache.kafka.connect.storage.KafkaStatusBackingStore$4.onCompletion(KafkaStatusBackingStore.java:298){code} Note the repeated KafkaProducer.send -> KafkaProducer.doSend -> KafkaStatusBackingStore$4.onCompletion calls, causing the issue. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14838) MM2 Connector/Task clients should specify client ID based on ID and role
Daniel Urban created KAFKA-14838: Summary: MM2 Connector/Task clients should specify client ID based on ID and role Key: KAFKA-14838 URL: https://issues.apache.org/jira/browse/KAFKA-14838 Project: Kafka Issue Type: Improvement Reporter: Daniel Urban Assignee: Daniel Urban MM2 code creates a lot of Kafka clients internally. These clients generate a lot of logs, but since the client.id is not properly specified, connecting the dots between a specific Connector/Task and its internal client is close to impossible. The client.id of such clients should specify the Connector name/Task ID, and should also specify the role of the client. E.g. MirrorSourceConnector uses multiple admin clients, and their client.id should reflect the difference between them. This will help log analysis significantly, especially in MM2 mode. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14721) Kafka listener uses wrong login class
Daniel Urban created KAFKA-14721: Summary: Kafka listener uses wrong login class Key: KAFKA-14721 URL: https://issues.apache.org/jira/browse/KAFKA-14721 Project: Kafka Issue Type: Bug Affects Versions: 3.1.2 Reporter: Daniel Urban When trying to configure a single SASL_SSL listener with GSSAPI, Scram and OAuth support, we encounter an error at startup: {code:java} 2023-02-15 13:26:04,250 ERROR kafka.server.KafkaServer: [main]: [KafkaServer id=104] Fatal error during KafkaServer startup. Prepare to shutdown org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at kafka.network.Processor.(SocketServer.scala:861) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.newProcessor(SocketServer.scala:442) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:299) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) ~[scala-library-2.13.10.jar:?] at kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:297) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:262) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:259) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) ~[scala-library-2.13.10.jar:?] at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) ~[scala-library-2.13.10.jar:?] at scala.collection.AbstractIterable.foreach(Iterable.scala:933) ~[scala-library-2.13.10.jar:?] at kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:259) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.network.SocketServer.startup(SocketServer.scala:131) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.server.KafkaServer.startup(KafkaServer.scala:310) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at kafka.Kafka$.main(Kafka.scala:109) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1(Kafka.scala:107) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1$adapted(Kafka.scala:107) ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.runMain(Kafka.scala:118) [kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:110) [kafka_2.13-3.1.2.7.1.9.0-15.jar:?] at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala) [kafka_2.13-3.1.2.7.1.9.0-15.jar:?] Caused by: java.lang.IllegalArgumentException: No serviceName defined in either JAAS or Kafka config at org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:309) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:61) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170) ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?] ... 21 more{code} Using the following configs in a Kafka broker: jaas configuration file: {code:java} KafkaServer { com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true useKeyTab=true storeKey=true serviceName="kafka" keyTab="/var/KAFKA_BROKER/kafka.keytab" principal="kafka/hgiovr@SITE"; org.apache.kafka.common.security.scram.ScramLoginModule required; };{code} and the following properties: {code:java} listener.name.sasl_ssl.sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required; listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuth
[jira] [Resolved] (KAFKA-14716) Connect schema does not allow struct default values
[ https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-14716. -- Resolution: Duplicate > Connect schema does not allow struct default values > --- > > Key: KAFKA-14716 > URL: https://issues.apache.org/jira/browse/KAFKA-14716 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > The ConnectSchema API should allow specifying a composite (struct) default > value for a field, but with the current API, it is impossible to do so. > # There is a circular dependency between creating a struct as a default > value and creating the schema which holds it as the default value. The Struct > constructor expects a Schema object, and the default value setter of > SchemaBuilder checks schema conformity by using the > ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This > can only be bypassed if the struct references a SchemaBuilder instance, and > defaultValue is called on that builder instance, but this goes against the > Struct docs stating that "Struct objects must specify a complete \{@link > Schema} up front". > # ConnectSchema.equals is not prepared to be used with other Schema > implementations, so equals checks between ConnectSchema and SchemaBuilder > instances will always fail. This is only causing an issue if equals has to be > used for schema conformity checks. > Code examples: > Working code (mind that the schema referenced by the Struct is a > SchemaBuilder, and it is mutated after the Struct is constructed): > {code:java} > @Test > public void testCompositeDefault() { > SchemaBuilder nestedSchema = SchemaBuilder.struct() > .field("bar", Schema.STRING_SCHEMA); > Struct nestedDefault = new Struct(nestedSchema); > nestedDefault.put("bar", "default_value"); > Schema schema = SchemaBuilder.struct() > .field("foo", > nestedSchema > .defaultValue(nestedDefault) > .build() > ) > .build(); > } {code} > Not working code (but better aligned with the current API and docs - 2 > separate Schema instances used by the Struct and the field, only diff is the > default value between the 2): > {code:java} > @Test > public void testCompositeDefault() { > Schema nestedSchema = SchemaBuilder.struct() > .field("bar", Schema.STRING_SCHEMA) > .build(); > Struct nestedDefault = new Struct(nestedSchema); > nestedDefault.put("bar", "default_value"); > Schema schema = SchemaBuilder.struct() > .field("foo", > SchemaBuilder > .struct() > .field("bar", Schema.STRING_SCHEMA) > .defaultValue(nestedDefault) > .build() > ) > .build(); > }{code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14716) Connect schema does not allow struct default values
Daniel Urban created KAFKA-14716: Summary: Connect schema does not allow struct default values Key: KAFKA-14716 URL: https://issues.apache.org/jira/browse/KAFKA-14716 Project: Kafka Issue Type: Bug Reporter: Daniel Urban Assignee: Daniel Urban The ConnectSchema API should allow specifying a composite (struct) default value for a field, but with the current API, it is impossible to do so. # There is a circular dependency between creating a struct as a default value and creating the schema which holds it as the default value. The Struct constructor expects a Schema object, and the default value setter of SchemaBuilder checks schema conformity by using the ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This can only be bypassed if the struct references a SchemaBuilder instance, and defaultValue is called on that builder instance, but this goes against the Struct docs stating that "Struct objects must specify a complete \{@link Schema} up front". # ConnectSchema.equals is not prepared to be used with other Schema implementations, so equals checks between ConnectSchema and SchemaBuilder instances will always fail. This is only causing an issue if equals has to be used for schema conformity checks. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14667) Delayed leader election operation gets stuck in purgatory
Daniel Urban created KAFKA-14667: Summary: Delayed leader election operation gets stuck in purgatory Key: KAFKA-14667 URL: https://issues.apache.org/jira/browse/KAFKA-14667 Project: Kafka Issue Type: Bug Affects Versions: 3.1.1 Reporter: Daniel Urban This was observer with Kafka 3.1.1, but I believe that latest versions are also affected. In the Cruise Control project, there is an integration test: com.linkedin.kafka.cruisecontrol.executor.ExecutorTest#testReplicaReassignmentProgressWithThrottle On our infrastructure, this test fails every ~20th run with a timeout - the triggered preferred leadership election is never completed. After some investigation, it turns out that: # The admin client never gets a response from the broker. # The leadership change is executed successfully. # The ElectLeader purgatory never gets an update for the relevant topic partition. A few relevant lines from a failed run (this test uses an embedded cluster, logs are mixed): CC successfully sends a preferred election request to the controller (broker 0), topic1-0 needs a leadership change from broker 0 to broker 1: {code:java} 2023-02-01 01:20:26.028 [controller-event-thread] DEBUG kafka.controller.KafkaController - [Controller id=0] Waiting for any successful result for election type (PREFERRED) by AdminClientTriggered for partitions: Map(topic1-0 -> Right(1), topic0-0 -> Left(ApiError(error=ELECTION_NOT_NEEDED, message=Leader election not needed for topic partition.))) 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1) {code} The delayed operation for the leader election is triggered 2 times in quick succession (yes, same ms in both logs): {code:java} 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1) 2023-02-01 01:20:26.031 [controller-event-thread] DEBUG kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: HashMap(topic1-0 -> 1){code} Shortly after (few ms later based on the logs), broker 0 receives an UpdateMetadataRequest from the controller (itself) and processes it: {code:java} 2023-02-01 01:20:26.033 [Controller-0-to-broker-0-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Sending UPDATE_METADATA request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, correlationId=19) and timeout 3 to node 0: UpdateMetadataRequestData(controllerId=0, controllerEpoch=1, brokerEpoch=25, ungroupedPartitionStates=[], topicStates=[UpdateMetadataTopicState(topicName='topic1', topicId=gkFP8VnkSGyEf_LBBZSowQ, partitionStates=[UpdateMetadataPartitionState(topicName='topic1', partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=2, isr=[0, 1], zkVersion=2, replicas=[1, 0], offlineReplicas=[])])], liveBrokers=[UpdateMetadataBroker(id=1, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=40236, host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null), UpdateMetadataBroker(id=0, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=42556, host='localhost', listener='PLAINTEXT', securityProtocol=0)], rack=null)]) 2023-02-01 01:20:26.035 [Controller-0-to-broker-0-send-thread] DEBUG org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] Received UPDATE_METADATA response from node 0 for request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, correlationId=19): UpdateMetadataResponseData(errorCode=0) 2023-02-01 01:20:26.035 [data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0] DEBUG kafka.request.logger - Completed request:{"isForwarded":false,"requestHeader":{"requestApiKey":6,"requestApiVersion":7,"correlationId":19,"clientId":"0","requestApiKeyName":"UPDATE_METADATA"},"request":{"controllerId":0,"controllerEpoch":1,"brokerEpoch":25,"topicStates":[{"topicName":"topic1","topicId":"gkFP8VnkSGyEf_LBBZSowQ","partitionStates":[{"partitionIndex":0,"controllerEpoch":1,"leader":1,"leaderEpoch":2,"isr":[0,1],"zkVersion":2,"replicas":[1,0],"offlineReplicas":[]}]}],"liveBrokers":[{"id":1,"endpoints":[{"port":40236,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null},{"id":0,"endpoints":[{"port":42556,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null}]},"response":{"errorCode":0},"connection":"127.0.0.1:42556-127.0.0.1:55952-0","totalTimeMs":1.904,"requestQueueTimeMs":0.108,"localTimeMs":0.788,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.842,"sendTimeMs":0.164,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}} {code} The update metadata
[jira] [Created] (KAFKA-14653) MM2 should delay resolving config provider references
Daniel Urban created KAFKA-14653: Summary: MM2 should delay resolving config provider references Key: KAFKA-14653 URL: https://issues.apache.org/jira/browse/KAFKA-14653 Project: Kafka Issue Type: Sub-task Reporter: Daniel Urban Assignee: Daniel Urban MM2 eagerly resolves config providers, meaning that the generated Connector configurations contain inline values. This is a problem for sensitive and host-specific configurations, so MM2 should delay the resolution of such configs. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context
Daniel Urban created KAFKA-14652: Summary: Improve MM2 logging by adding the flow information to the context Key: KAFKA-14652 URL: https://issues.apache.org/jira/browse/KAFKA-14652 Project: Kafka Issue Type: Improvement Reporter: Daniel Urban MirrorMaker2 runs multiple Connect worker instances in a single process. In Connect, the logging is based on the assumption that Connector names are unique. But in MM2, the same Connector names are being used in each flow (Connect group). This means that there is no way to differentiate between the logs of MirrorSourceConnector in A->B and in B->A. This can be improved by adding the flow to the logging context. Additionally, the client.id of the Kafka clients used by the MM2 Connectors should also be set explicitly, with the flow information added. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout
Daniel Urban created KAFKA-14053: Summary: Transactional producer should bump the epoch when a batch encounters delivery timeout Key: KAFKA-14053 URL: https://issues.apache.org/jira/browse/KAFKA-14053 Project: Kafka Issue Type: Bug Reporter: Daniel Urban Assignee: Daniel Urban When a batch fails due to delivery timeout, it is possible that the batch is still in-flight. Due to underlying infra issues, it is possible that an EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight batch is processed on the leader. This can cause transactional batches to be appended to the log after the corresponding abort marker. This can cause the LSO to be infinitely blocked in the partition, or can even violate processing guarantees, as the out-of-order batch can become part of the next transaction. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (KAFKA-13970) TopicAdmin topic creation should be retried on TimeoutException
Daniel Urban created KAFKA-13970: Summary: TopicAdmin topic creation should be retried on TimeoutException Key: KAFKA-13970 URL: https://issues.apache.org/jira/browse/KAFKA-13970 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Daniel Urban org.apache.kafka.connect.util.TopicAdmin#createTopicsWithRetry handles the case when there aren't enough brokers in the cluster to create a topic with the expected replication factor. This logic should also handle the case when there are 0 brokers in the cluster, and should retry in that case. -- This message was sent by Atlassian Jira (v8.20.7#820007)
[jira] [Resolved] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks
[ https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-13809. -- Resolution: Won't Fix Connector configs are used for consumer/producer overrides, converter classes, etc. Not necessary to propagate all configs to tasks. > FileStreamSinkConnector and FileStreamSourceConnector should propagate full > configuration to tasks > -- > > Key: KAFKA-13809 > URL: https://issues.apache.org/jira/browse/KAFKA-13809 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Daniel Urban >Priority: Major > > The 2 example connectors do not propagate the full connector configuration to > the tasks. This makes it impossible to override built-in configs, such as > producer/consumer overrides. > This causes an issue even when used for testing purposes. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks
Daniel Urban created KAFKA-13809: Summary: FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks Key: KAFKA-13809 URL: https://issues.apache.org/jira/browse/KAFKA-13809 Project: Kafka Issue Type: Improvement Components: KafkaConnect Reporter: Daniel Urban The 2 example connectors do not propagate the full connector configuration to the tasks. This makes it impossible to override built-in configs, such as key.converter. This causes an issue even when used for testing purposes. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13756) Connect validate endpoint should return proper response on name and connector class error
Daniel Urban created KAFKA-13756: Summary: Connect validate endpoint should return proper response on name and connector class error Key: KAFKA-13756 URL: https://issues.apache.org/jira/browse/KAFKA-13756 Project: Kafka Issue Type: Improvement Reporter: Daniel Urban Currently, if there is an issue with the connector name or the connector class, the validate endpoint returns a 500 response. Instead, it should return a well formatted response containing proper validation error messages. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13459) MM2 should be able to add the source offset to the record header
Daniel Urban created KAFKA-13459: Summary: MM2 should be able to add the source offset to the record header Key: KAFKA-13459 URL: https://issues.apache.org/jira/browse/KAFKA-13459 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban MM2 could add the source offset to the record header to help with diagnostics in some use-cases. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13452) MM2 creates invalid checkpoint when offset mapping is not available
Daniel Urban created KAFKA-13452: Summary: MM2 creates invalid checkpoint when offset mapping is not available Key: KAFKA-13452 URL: https://issues.apache.org/jira/browse/KAFKA-13452 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban MM2 checkpointing reads the offset-syncs topic to create offset mappings for committed consumer group offsets. In some corner cases, it is possible that a mapping is not available in offset-syncs - in that case, MM2 simply copies the source offset, which might not be a valid offset in the replica topic at all. One possible situation is if there is an empty topic in the source cluster with a non-zero endoffset (e.g. retention already removed the records), and a consumer group which has a committed offset set to the end offset. If replication is configured to start replicating this topic, it will not have an offset mapping available in offset-syncs (as the topic is empty), causing MM2 to copy the source offset. This can cause issues when auto offset sync is enabled, as the consumer group offset can be potentially set to a high number. MM2 never rewinds these offsets, so even when there is a correct offset mapping available, the offset will not be updated correctly. -- This message was sent by Atlassian Jira (v8.20.1#820001)
[jira] [Created] (KAFKA-13311) MM2 should allow propagating arbitrary global configurations to the Connector config
Daniel Urban created KAFKA-13311: Summary: MM2 should allow propagating arbitrary global configurations to the Connector config Key: KAFKA-13311 URL: https://issues.apache.org/jira/browse/KAFKA-13311 Project: Kafka Issue Type: Improvement Components: mirrormaker Affects Versions: 2.8.1 Reporter: Daniel Urban Assignee: Daniel Urban Currently, the configuration propagation logic in MM2 only allows a handful of configurations to be applied to all Connector configs managed by MM2. In some cases (e.g. custom topic or group filters, metric reporters, etc.) it would be useful to be able to define configurations "globally", without prefixing the config with each replication. E.g. the "connectors." prefix could be used to declare global Connector configs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters
[ https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-13253. -- Resolution: Duplicate Same issue as KAFKA-9747 - that one already has a fix under review > Kafka Connect losing task (re)configuration when connector name has special > characters > -- > > Key: KAFKA-13253 > URL: https://issues.apache.org/jira/browse/KAFKA-13253 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.7.1 >Reporter: David Dufour >Priority: Major > > When not leader, DistributedHerder.reconfigureConnector() forwards the task > configuration to the leader as follow: > {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + > connName + "/tasks"); > log.trace("Forwarding task configurations for connector {} to leader", > connName); > RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, > sessionKey, requestSignatureAlgorithm); > {quote} > The problem is that if the connector name contains some special characters, > such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an > uncatched exception is raised in RestClient and the forward is lost. > Here is the kind of exception we can catch by adding the necessary code in > RestClient: > {quote}java.lang.IllegalArgumentException: Illegal character in path at index > 51: > [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks > {quote} > An additional catch() should be added in RestClient.httpRequest(), here: > {quote}{{catch (IOException | InterruptedException | TimeoutException | > ExecutionException e) {}} > log.error("IO error forwarding REST request: ", e); > {{throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, > "IO Error trying to forward REST request: " + e.getMessage(), e);}} > {{} finally {}} > {quote} > to catch all other exceptions because without, this kind of problem is > completly silent. > To reproduce: > * start 2 kafka clusters > * start a kafka connect (distributed) with at least 2 nodes > * start an HeartbeatConnector with name "cluster1->cluster2" > If the node which generated the task is not the leader (not systematic), it > will forward the creation to the leader and it will be lost. As a result, the > connector will stay in RUNNING state but without any task. > Problem not easy to reproduce, it is important to start with empty connect > topics to reproduce more easily -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode
[ https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-12664. -- Resolution: Not A Bug > Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 > clusters in standalone mode > - > > Key: KAFKA-12664 > URL: https://issues.apache.org/jira/browse/KAFKA-12664 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0 >Reporter: Edward Vaisman >Assignee: Daniel Urban >Priority: Major > Attachments: connect.log.tar.gz, docker-compose-multi.yml, > mm2.properties > > > Hi Folks, I came across this issue when trying to aggregate data from two > separate data centres into one data centre. > In the configuration below, you can see I am trying to replicate a topic from > dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 > (test_topic_dc2) to dc3. > However, when I try to replicate both topics from those datacenters at the > same time I notice that connect gets stuck in a rebalance loop (see > attachment for logs) > [^connect.log.tar.gz] > excerpt of connect.log > {code:java} > 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] > Successfully synced group in generation Generation{generationId=347, > memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > protocol='sessioned'} > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13 > 17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance > started > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13 > 17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining > group > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13 > 17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group > at generation 347 with protocol version 2 and got assignment: > Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', > leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], > taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with > rebalance delay: 0 > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688) > {code} > To replicate the issue here is what I used: > [^mm2.properties] > {code:java} > clusters = dc1, dc2, dc3 > dc1.bootstrap.servers = kafka-dc1:19092 > dc2.bootstrap.servers = kafka-dc2:19093 > dc3.bootstrap.servers = kafka-dc3:19094 > dc1.group.id=mm2-dc1 > dc2.group.id=mm2-dc2 > dc3.group.id=mm2-dc3 > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > dc1->dc3.enabled = true > dc1->dc3.topics = test_topic_dc1 > dc2->dc3.enabled = true > dc2->dc3.topics = test_topic_dc2 > dc3->dc2 = falsedc3->dc1 = false > {code} > This [^docker-compose-multi.yml] file to create local kafka clusters > (dc1,dc2,dc3) > (I set docker to use 6 cpus, 8gb mem, swap 2gb) > I then ran an interactive shell to run mirror maker within the same > docker-compose network (change network to match yours) > {code:java} > docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash > # Upload mm2 properties on server > /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code} > Kafkacat commands to produce to dc1, dc2 > {code:java} > kafkacat -b localhost:9092 -t test_topic_dc1 -P > Hello World from DC1!{code} > {code:java} > kafkacat -b localhost:9093 -t test_topic_dc2 -P > Hello World from DC2{code} > I then tried to remove one of the datacenters to confirm if it was a > configuration problem, however mirror maker ran successfully with the below > configuration > mm2.properties > {code:java} > clusters = dc2, dc3 > dc2.bootstrap.servers = kafka-dc2:19093 > dc3.bootstrap.servers = kafka-dc3:19094 > dc2.group.id=mm2-dc2 > dc3.group.id=mm2-dc3 > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > dc2->dc3.enabled = true > dc2->dc3.topics = test_topic_dc2 > {code
[jira] [Created] (KAFKA-10586) MirrorMaker 2.0 REST support
Daniel Urban created KAFKA-10586: Summary: MirrorMaker 2.0 REST support Key: KAFKA-10586 URL: https://issues.apache.org/jira/browse/KAFKA-10586 Project: Kafka Issue Type: Improvement Components: mirrormaker Reporter: Daniel Urban Assignee: Daniel Urban KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means that with specific workloads, the dedicated MM2 cluster can become unable to react to dynamic topic and group filter changes. (This occurs when after a rebalance operation, the leader node has no MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is stopped on the leader, meaning it cannot detect config changes by itself. Followers still running the connector can detect config changes, but they cannot query the leader for config updates.) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337
[ https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Urban resolved KAFKA-10414. -- Resolution: Not A Problem api-util is only a test dependency, not an issue. > Upgrade api-util dependency - CVE-2018-1337 > --- > > Key: KAFKA-10414 > URL: https://issues.apache.org/jira/browse/KAFKA-10414 > Project: Kafka > Issue Type: Bug >Reporter: Daniel Urban >Assignee: Daniel Urban >Priority: Major > > There is a dependency on org.apache.directory.api:api-util:1.0.0, which is > involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<= > This is a transitive dependency through the apacheds libs. > -Can be fixed by upgrading to at least version 2.0.0.AM25- > Since api-all is also a dependency, and there is a class collision between > api-all and newer version of api-util, it is better to just upgrade api-util > to 1.0.2 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337
Daniel Urban created KAFKA-10414: Summary: Upgrade api-util dependency - CVE-2018-1337 Key: KAFKA-10414 URL: https://issues.apache.org/jira/browse/KAFKA-10414 Project: Kafka Issue Type: Bug Reporter: Daniel Urban There is a dependency on org.apache.directory.api:api-util:1.0.0, which is involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<= This is a transitive dependency through the apacheds libs. Can be fixed by upgrading to at least version 2.0.0.AM25 -- This message was sent by Atlassian Jira (v8.3.4#803005)