[jira] [Created] (KAFKA-17534) Allow disabling hearbeats topic replication in MirrorSourceConnector

2024-09-12 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-17534:


 Summary: Allow disabling hearbeats topic replication in 
MirrorSourceConnector
 Key: KAFKA-17534
 URL: https://issues.apache.org/jira/browse/KAFKA-17534
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban
Assignee: Daniel Urban


Currently, MirrorSourceConnector always replicates the heartbeats topics. In 
some use-cases (e.g. multiple parallel, mutually exclusive 
MirrorSourceConnector instances in the same replication), users need to disable 
this behavior.

Add a new property to allow disabling the heartbeats replication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15372) MM2 rolling restart can drop configuration changes silently

2023-08-17 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-15372:


 Summary: MM2 rolling restart can drop configuration changes 
silently
 Key: KAFKA-15372
 URL: https://issues.apache.org/jira/browse/KAFKA-15372
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban


When MM2 is restarted, it tries to update the Connector configuration in all 
flows. This is a one-time trial, and fails if the Connect worker is not the 
leader of the group.

In a distributed setup and with a rolling restart, it is possible that for a 
specific flow, the Connect worker of the just restarted MM2 instance is not the 
leader, meaning that Connector configurations can get dropped.

For example, assuming 2 MM2 instances, and one flow A->B:
 # MM2 instance 1 is restarted, the worker inside MM2 instance 2 becomes the 
leader of A->B Connect group.
 # MM2 instance 1 tries to update the Connector configurations, but fails 
(instance 2 has the leader, not instance 1)
 # MM2 instance 2 is restarted, leadership moves to worker in MM2 instance 1
 # MM2 instance 2 tries to update the Connector configurations, but fails

At this point, the configuration changes before the restart are never applied. 
Many times, this can also happen silently, without any indication.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-15075) MM2 internal checkpoints topic should support multiple partitions

2023-06-09 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-15075:


 Summary: MM2 internal checkpoints topic should support multiple 
partitions
 Key: KAFKA-15075
 URL: https://issues.apache.org/jira/browse/KAFKA-15075
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban


Currently, the internal checkpoints topic of MM2 uses a single partition.

This is an unnecessary limitation, and instead, it should support more 
partitions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14978) ExactlyOnceWorkerSourceTask does not remove parent metrics

2023-05-08 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14978:


 Summary: ExactlyOnceWorkerSourceTask does not remove parent metrics
 Key: KAFKA-14978
 URL: https://issues.apache.org/jira/browse/KAFKA-14978
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Daniel Urban
Assignee: Daniel Urban


ExactlyOnceWorkerSourceTask removeMetrics does not invoke super.removeMetrics, 
meaning that only the transactional metrics are removed, and common source task 
metrics are not.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14903) MM2 Topic And Group Listener (KIP-918)

2023-04-13 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14903:


 Summary: MM2 Topic And Group Listener (KIP-918)
 Key: KAFKA-14903
 URL: https://issues.apache.org/jira/browse/KAFKA-14903
 Project: Kafka
  Issue Type: New Feature
Reporter: Daniel Urban
Assignee: Daniel Urban


MM2 has a dynamic topic and group filter mechanism, in which the replicated 
topics/groups can dynamically change, either due to changes in the available 
topics/groups, or changes in the filter settings.

In order to monitor the currently replicated topics/groups, MM2 should support 
a TopicListener and GroupListener plugin, which is triggered when MM2 changes 
the set of replicated topics/groups.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14902) KafkaBasedLog infinite retries can lead to StackOverflowError

2023-04-13 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14902:


 Summary: KafkaBasedLog infinite retries can lead to 
StackOverflowError
 Key: KAFKA-14902
 URL: https://issues.apache.org/jira/browse/KAFKA-14902
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Reporter: Daniel Urban
Assignee: Daniel Urban


KafkaBasedLog subclasses use an infinite retry on producer sends, using a 
callback. Sometimes, when specific errors are encountered, the callback is 
invoked in the send call, on the calling thread. If this happens enough times, 
a stack overflow happens.

Example stacktrace from 2.5 (but the newest code can also encounter the same):
{code:java}
2023-01-14 12:48:23,487 ERROR org.apache.kafka.connect.runtime.WorkerTask: 
WorkerSourceTask{id=MirrorSourceConnector-1} Task threw an uncaught and 
unrecoverable exception java.lang.StackOverflowError: null at 
org.apache.kafka.common.metrics.stats.SampledStat.record(SampledStat.java:50) 
at org.apache.kafka.common.metrics.stats.Rate.record(Rate.java:60) at 
org.apache.kafka.common.metrics.stats.Meter.record(Meter.java:80) at 
org.apache.kafka.common.metrics.Sensor.record(Sensor.java:188) at 
org.apache.kafka.common.metrics.Sensor.record(Sensor.java:178) at 
org.apache.kafka.clients.producer.internals.BufferPool.recordWaitTime(BufferPool.java:202)
 at 
org.apache.kafka.clients.producer.internals.BufferPool.allocate(BufferPool.java:147)
 at 
org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:221)
 at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:941) 
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862) 
at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:238) at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore$4.onCompletion(KafkaStatusBackingStore.java:298)
 ... at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:959) 
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862) 
at org.apache.kafka.connect.util.KafkaBasedLog.send(KafkaBasedLog.java:238) at 
org.apache.kafka.connect.storage.KafkaStatusBackingStore$4.onCompletion(KafkaStatusBackingStore.java:298){code}
Note the repeated KafkaProducer.send -> KafkaProducer.doSend -> 
KafkaStatusBackingStore$4.onCompletion calls, causing the issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14838) MM2 Connector/Task clients should specify client ID based on ID and role

2023-03-23 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14838:


 Summary: MM2 Connector/Task clients should specify client ID based 
on ID and role
 Key: KAFKA-14838
 URL: https://issues.apache.org/jira/browse/KAFKA-14838
 Project: Kafka
  Issue Type: Improvement
Reporter: Daniel Urban
Assignee: Daniel Urban


MM2 code creates a lot of Kafka clients internally. These clients generate a 
lot of logs, but since the client.id is not properly specified, connecting the 
dots between a specific Connector/Task and its internal client is close to 
impossible.

The client.id of such clients should specify the Connector name/Task ID, and 
should also specify the role of the client. E.g. MirrorSourceConnector uses 
multiple admin clients, and their client.id should reflect the difference 
between them. This will help log analysis significantly, especially in MM2 mode.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14721) Kafka listener uses wrong login class

2023-02-15 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14721:


 Summary: Kafka listener uses wrong login class
 Key: KAFKA-14721
 URL: https://issues.apache.org/jira/browse/KAFKA-14721
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.2
Reporter: Daniel Urban


When trying to configure a single SASL_SSL listener with GSSAPI, Scram and 
OAuth support, we encounter an error at startup:
{code:java}
2023-02-15 13:26:04,250 ERROR kafka.server.KafkaServer: [main]: [KafkaServer 
id=104] Fatal error during KafkaServer startup. Prepare to shutdown
org.apache.kafka.common.KafkaException: java.lang.IllegalArgumentException: No 
serviceName defined in either JAAS or Kafka config
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:184)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.network.ChannelBuilders.serverChannelBuilder(ChannelBuilders.java:107)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at kafka.network.Processor.(SocketServer.scala:861) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.network.SocketServer.newProcessor(SocketServer.scala:442) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
kafka.network.SocketServer.$anonfun$addDataPlaneProcessors$1(SocketServer.scala:299)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:190) 
~[scala-library-2.13.10.jar:?]
        at 
kafka.network.SocketServer.addDataPlaneProcessors(SocketServer.scala:297) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1(SocketServer.scala:262)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
kafka.network.SocketServer.$anonfun$createDataPlaneAcceptorsAndProcessors$1$adapted(SocketServer.scala:259)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:575) 
~[scala-library-2.13.10.jar:?]
        at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:573) 
~[scala-library-2.13.10.jar:?]
        at scala.collection.AbstractIterable.foreach(Iterable.scala:933) 
~[scala-library-2.13.10.jar:?]
        at 
kafka.network.SocketServer.createDataPlaneAcceptorsAndProcessors(SocketServer.scala:259)
 ~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.network.SocketServer.startup(SocketServer.scala:131) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.server.KafkaServer.startup(KafkaServer.scala:310) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at kafka.Kafka$.main(Kafka.scala:109) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1(Kafka.scala:107) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at 
com.cloudera.kafka.wrap.Kafka$.$anonfun$main$1$adapted(Kafka.scala:107) 
~[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka$.runMain(Kafka.scala:118) 
[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka$.main(Kafka.scala:110) 
[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
        at com.cloudera.kafka.wrap.Kafka.main(Kafka.scala) 
[kafka_2.13-3.1.2.7.1.9.0-15.jar:?]
Caused by: java.lang.IllegalArgumentException: No serviceName defined in either 
JAAS or Kafka config
        at 
org.apache.kafka.common.security.kerberos.KerberosLogin.getServiceName(KerberosLogin.java:309)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.security.kerberos.KerberosLogin.configure(KerberosLogin.java:92)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.security.authenticator.LoginManager.(LoginManager.java:61)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.security.authenticator.LoginManager.acquireLoginManager(LoginManager.java:105)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        at 
org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:170)
 ~[kafka-clients-3.1.2.7.1.9.0-15.jar:?]
        ... 21 more{code}
Using the following configs in a Kafka broker:

jaas configuration file:
{code:java}
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required doNotPrompt=true 
useKeyTab=true storeKey=true serviceName="kafka" 
keyTab="/var/KAFKA_BROKER/kafka.keytab" principal="kafka/hgiovr@SITE";
org.apache.kafka.common.security.scram.ScramLoginModule required;
};{code}
and the following properties:
{code:java}
listener.name.sasl_ssl.sasl.enabled.mechanisms=GSSAPI,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARER
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule
 required;
listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuth

[jira] [Resolved] (KAFKA-14716) Connect schema does not allow struct default values

2023-02-15 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-14716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-14716.
--
Resolution: Duplicate

> Connect schema does not allow struct default values
> ---
>
> Key: KAFKA-14716
> URL: https://issues.apache.org/jira/browse/KAFKA-14716
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> The ConnectSchema API should allow specifying a composite (struct) default 
> value for a field, but with the current API, it is impossible to do so.
>  # There is a circular dependency between creating a struct as a default 
> value and creating the schema which holds it as the default value. The Struct 
> constructor expects a Schema object, and the default value setter of 
> SchemaBuilder checks schema conformity by using the 
> ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This 
> can only be bypassed if the struct references a SchemaBuilder instance, and 
> defaultValue is called on that builder instance, but this goes against the 
> Struct docs stating that "Struct objects must specify a complete \{@link 
> Schema} up front".
>  # ConnectSchema.equals is not prepared to be used with other Schema 
> implementations, so equals checks between ConnectSchema and SchemaBuilder 
> instances will always fail. This is only causing an issue if equals has to be 
> used for schema conformity checks.
> Code examples:
> Working code (mind that the schema referenced by the Struct is a 
> SchemaBuilder, and it is mutated after the Struct is constructed):
> {code:java}
> @Test
> public void testCompositeDefault() {
> SchemaBuilder nestedSchema = SchemaBuilder.struct()
> .field("bar", Schema.STRING_SCHEMA);
> Struct nestedDefault = new Struct(nestedSchema);
> nestedDefault.put("bar", "default_value");
> Schema schema = SchemaBuilder.struct()
> .field("foo",
> nestedSchema
> .defaultValue(nestedDefault)
> .build()
> )
> .build();
> } {code}
> Not working code (but better aligned with the current API and docs - 2 
> separate Schema instances used by the Struct and the field, only diff is the 
> default value between the 2):
> {code:java}
>  @Test
> public void testCompositeDefault() {
> Schema nestedSchema = SchemaBuilder.struct()
> .field("bar", Schema.STRING_SCHEMA)
> .build();
> Struct nestedDefault = new Struct(nestedSchema);
> nestedDefault.put("bar", "default_value");
> Schema schema = SchemaBuilder.struct()
> .field("foo",
> SchemaBuilder
> .struct()
> .field("bar", Schema.STRING_SCHEMA)
> .defaultValue(nestedDefault)
> .build()
> )
> .build();
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14716) Connect schema does not allow struct default values

2023-02-14 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14716:


 Summary: Connect schema does not allow struct default values
 Key: KAFKA-14716
 URL: https://issues.apache.org/jira/browse/KAFKA-14716
 Project: Kafka
  Issue Type: Bug
Reporter: Daniel Urban
Assignee: Daniel Urban


The ConnectSchema API should allow specifying a composite (struct) default 
value for a field, but with the current API, it is impossible to do so.
 # There is a circular dependency between creating a struct as a default value 
and creating the schema which holds it as the default value. The Struct 
constructor expects a Schema object, and the default value setter of 
SchemaBuilder checks schema conformity by using the 
ConnectSchema.validateValue, which in turn uses ConnectSchema.equals. This can 
only be bypassed if the struct references a SchemaBuilder instance, and 
defaultValue is called on that builder instance, but this goes against the 
Struct docs stating that "Struct objects must specify a complete \{@link 
Schema} up front".
 # ConnectSchema.equals is not prepared to be used with other Schema 
implementations, so equals checks between ConnectSchema and SchemaBuilder 
instances will always fail. This is only causing an issue if equals has to be 
used for schema conformity checks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14667) Delayed leader election operation gets stuck in purgatory

2023-02-01 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14667:


 Summary: Delayed leader election operation gets stuck in purgatory
 Key: KAFKA-14667
 URL: https://issues.apache.org/jira/browse/KAFKA-14667
 Project: Kafka
  Issue Type: Bug
Affects Versions: 3.1.1
Reporter: Daniel Urban


This was observer with Kafka 3.1.1, but I believe that latest versions are also 
affected.

In the Cruise Control project, there is an integration test: 
com.linkedin.kafka.cruisecontrol.executor.ExecutorTest#testReplicaReassignmentProgressWithThrottle

On our infrastructure, this test fails every ~20th run with a timeout - the 
triggered preferred leadership election is never completed. After some 
investigation, it turns out that:
 # The admin client never gets a response from the broker.
 # The leadership change is executed successfully.
 # The ElectLeader purgatory never gets an update for the relevant topic 
partition.

A few relevant lines from a failed run (this test uses an embedded cluster, 
logs are mixed):

CC successfully sends a preferred election request to the controller (broker 
0), topic1-0 needs a leadership change from broker 0 to broker 1:
{code:java}
2023-02-01 01:20:26.028 [controller-event-thread] DEBUG 
kafka.controller.KafkaController - [Controller id=0] Waiting for any successful 
result for election type (PREFERRED) by AdminClientTriggered for partitions: 
Map(topic1-0 -> Right(1), topic0-0 -> Left(ApiError(error=ELECTION_NOT_NEEDED, 
message=Leader election not needed for topic partition.)))
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1) {code}
The delayed operation for the leader election is triggered 2 times in quick 
succession (yes, same ms in both logs):
{code:java}
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1)
2023-02-01 01:20:26.031 [controller-event-thread] DEBUG 
kafka.server.DelayedElectLeader - tryComplete() waitingPartitions: 
HashMap(topic1-0 -> 1){code}
Shortly after (few ms later based on the logs), broker 0 receives an 
UpdateMetadataRequest from the controller (itself) and processes it:
{code:java}
2023-02-01 01:20:26.033 [Controller-0-to-broker-0-send-thread] DEBUG 
org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] 
Sending UPDATE_METADATA request with header 
RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, 
correlationId=19) and timeout 3 to node 0: 
UpdateMetadataRequestData(controllerId=0, controllerEpoch=1, brokerEpoch=25, 
ungroupedPartitionStates=[], 
topicStates=[UpdateMetadataTopicState(topicName='topic1', 
topicId=gkFP8VnkSGyEf_LBBZSowQ, 
partitionStates=[UpdateMetadataPartitionState(topicName='topic1', 
partitionIndex=0, controllerEpoch=1, leader=1, leaderEpoch=2, isr=[0, 1], 
zkVersion=2, replicas=[1, 0], offlineReplicas=[])])], 
liveBrokers=[UpdateMetadataBroker(id=1, v0Host='', v0Port=0, 
endpoints=[UpdateMetadataEndpoint(port=40236, host='localhost', 
listener='PLAINTEXT', securityProtocol=0)], rack=null), 
UpdateMetadataBroker(id=0, v0Host='', v0Port=0, 
endpoints=[UpdateMetadataEndpoint(port=42556, host='localhost', 
listener='PLAINTEXT', securityProtocol=0)], rack=null)])
2023-02-01 01:20:26.035 [Controller-0-to-broker-0-send-thread] DEBUG 
org.apache.kafka.clients.NetworkClient - [Controller id=0, targetBrokerId=0] 
Received UPDATE_METADATA response from node 0 for request with header 
RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=0, 
correlationId=19): UpdateMetadataResponseData(errorCode=0)
2023-02-01 01:20:26.035 
[data-plane-kafka-network-thread-0-ListenerName(PLAINTEXT)-PLAINTEXT-0] DEBUG 
kafka.request.logger - Completed 
request:{"isForwarded":false,"requestHeader":{"requestApiKey":6,"requestApiVersion":7,"correlationId":19,"clientId":"0","requestApiKeyName":"UPDATE_METADATA"},"request":{"controllerId":0,"controllerEpoch":1,"brokerEpoch":25,"topicStates":[{"topicName":"topic1","topicId":"gkFP8VnkSGyEf_LBBZSowQ","partitionStates":[{"partitionIndex":0,"controllerEpoch":1,"leader":1,"leaderEpoch":2,"isr":[0,1],"zkVersion":2,"replicas":[1,0],"offlineReplicas":[]}]}],"liveBrokers":[{"id":1,"endpoints":[{"port":40236,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null},{"id":0,"endpoints":[{"port":42556,"host":"localhost","listener":"PLAINTEXT","securityProtocol":0}],"rack":null}]},"response":{"errorCode":0},"connection":"127.0.0.1:42556-127.0.0.1:55952-0","totalTimeMs":1.904,"requestQueueTimeMs":0.108,"localTimeMs":0.788,"remoteTimeMs":0.0,"throttleTimeMs":0,"responseQueueTimeMs":0.842,"sendTimeMs":0.164,"securityProtocol":"PLAINTEXT","principal":"User:ANONYMOUS","listener":"PLAINTEXT","clientInformation":{"softwareName":"unknown","softwareVersion":"unknown"}}
 {code}
The update metadata

[jira] [Created] (KAFKA-14653) MM2 should delay resolving config provider references

2023-01-25 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14653:


 Summary: MM2 should delay resolving config provider references
 Key: KAFKA-14653
 URL: https://issues.apache.org/jira/browse/KAFKA-14653
 Project: Kafka
  Issue Type: Sub-task
Reporter: Daniel Urban
Assignee: Daniel Urban


MM2 eagerly resolves config providers, meaning that the generated Connector 
configurations contain inline values.

This is a problem for sensitive and host-specific configurations, so MM2 should 
delay the resolution of such configs.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14652) Improve MM2 logging by adding the flow information to the context

2023-01-25 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14652:


 Summary: Improve MM2 logging by adding the flow information to the 
context
 Key: KAFKA-14652
 URL: https://issues.apache.org/jira/browse/KAFKA-14652
 Project: Kafka
  Issue Type: Improvement
Reporter: Daniel Urban


MirrorMaker2 runs multiple Connect worker instances in a single process. In 
Connect, the logging is based on the assumption that Connector names are 
unique. But in MM2, the same Connector names are being used in each flow 
(Connect group). This means that there is no way to differentiate between the 
logs of MirrorSourceConnector in A->B and in B->A.

This can be improved by adding the flow to the logging context.

Additionally, the client.id of the Kafka clients used by the MM2 Connectors 
should also be set explicitly, with the flow information added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-14053) Transactional producer should bump the epoch when a batch encounters delivery timeout

2022-07-07 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-14053:


 Summary: Transactional producer should bump the epoch when a batch 
encounters delivery timeout
 Key: KAFKA-14053
 URL: https://issues.apache.org/jira/browse/KAFKA-14053
 Project: Kafka
  Issue Type: Bug
Reporter: Daniel Urban
Assignee: Daniel Urban


When a batch fails due to delivery timeout, it is possible that the batch is 
still in-flight. Due to underlying infra issues, it is possible that an 
EndTxnRequest and a WriteTxnMarkerRequest is processed before the in-flight 
batch is processed on the leader. This can cause transactional batches to be 
appended to the log after the corresponding abort marker.

This can cause the LSO to be infinitely blocked in the partition, or can even 
violate processing guarantees, as the out-of-order batch can become part of the 
next transaction.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (KAFKA-13970) TopicAdmin topic creation should be retried on TimeoutException

2022-06-08 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13970:


 Summary: TopicAdmin topic creation should be retried on 
TimeoutException
 Key: KAFKA-13970
 URL: https://issues.apache.org/jira/browse/KAFKA-13970
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Daniel Urban


org.apache.kafka.connect.util.TopicAdmin#createTopicsWithRetry handles the case 
when there aren't enough brokers in the cluster to create a topic with the 
expected replication factor. This logic should also handle the case when there 
are 0 brokers in the cluster, and should retry in that case.



--
This message was sent by Atlassian Jira
(v8.20.7#820007)


[jira] [Resolved] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks

2022-04-08 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13809?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-13809.
--
Resolution: Won't Fix

Connector configs are used for consumer/producer overrides, converter classes, 
etc.

Not necessary to propagate all configs to tasks.

> FileStreamSinkConnector and FileStreamSourceConnector should propagate full 
> configuration to tasks
> --
>
> Key: KAFKA-13809
> URL: https://issues.apache.org/jira/browse/KAFKA-13809
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Daniel Urban
>Priority: Major
>
> The 2 example connectors do not propagate the full connector configuration to 
> the tasks. This makes it impossible to override built-in configs, such as 
> producer/consumer overrides.
> This causes an issue even when used for testing purposes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13809) FileStreamSinkConnector and FileStreamSourceConnector should propagate full configuration to tasks

2022-04-08 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13809:


 Summary: FileStreamSinkConnector and FileStreamSourceConnector 
should propagate full configuration to tasks
 Key: KAFKA-13809
 URL: https://issues.apache.org/jira/browse/KAFKA-13809
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Daniel Urban


The 2 example connectors do not propagate the full connector configuration to 
the tasks. This makes it impossible to override built-in configs, such as 
key.converter.

This causes an issue even when used for testing purposes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13756) Connect validate endpoint should return proper response on name and connector class error

2022-03-21 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13756:


 Summary: Connect validate endpoint should return proper response 
on name and connector class error
 Key: KAFKA-13756
 URL: https://issues.apache.org/jira/browse/KAFKA-13756
 Project: Kafka
  Issue Type: Improvement
Reporter: Daniel Urban


Currently, if there is an issue with the connector name or the connector class, 
the validate endpoint returns a 500 response.

Instead, it should return a well formatted response containing proper 
validation error messages.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13459) MM2 should be able to add the source offset to the record header

2021-11-17 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13459:


 Summary: MM2 should be able to add the source offset to the record 
header
 Key: KAFKA-13459
 URL: https://issues.apache.org/jira/browse/KAFKA-13459
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban


MM2 could add the source offset to the record header to help with diagnostics 
in some use-cases.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13452) MM2 creates invalid checkpoint when offset mapping is not available

2021-11-12 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13452:


 Summary: MM2 creates invalid checkpoint when offset mapping is not 
available
 Key: KAFKA-13452
 URL: https://issues.apache.org/jira/browse/KAFKA-13452
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban


MM2 checkpointing reads the offset-syncs topic to create offset mappings for 
committed consumer group offsets. In some corner cases, it is possible that a 
mapping is not available in offset-syncs - in that case, MM2 simply copies the 
source offset, which might not be a valid offset in the replica topic at all.

One possible situation is if there is an empty topic in the source cluster with 
a non-zero endoffset (e.g. retention already removed the records), and a 
consumer group which has a committed offset set to the end offset. If 
replication is configured to start replicating this topic, it will not have an 
offset mapping available in offset-syncs (as the topic is empty), causing MM2 
to copy the source offset.

This can cause issues when auto offset sync is enabled, as the consumer group 
offset can be potentially set to a high number. MM2 never rewinds these 
offsets, so even when there is a correct offset mapping available, the offset 
will not be updated correctly.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13311) MM2 should allow propagating arbitrary global configurations to the Connector config

2021-09-20 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-13311:


 Summary: MM2 should allow propagating arbitrary global 
configurations to the Connector config
 Key: KAFKA-13311
 URL: https://issues.apache.org/jira/browse/KAFKA-13311
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Affects Versions: 2.8.1
Reporter: Daniel Urban
Assignee: Daniel Urban


Currently, the configuration propagation logic in MM2 only allows a handful of 
configurations to be applied to all Connector configs managed by MM2.

In some cases (e.g. custom topic or group filters, metric reporters, etc.) it 
would be useful to be able to define configurations "globally", without 
prefixing the config with each replication.

E.g. the "connectors." prefix could be used to declare global Connector configs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13253) Kafka Connect losing task (re)configuration when connector name has special characters

2021-08-30 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13253?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-13253.
--
Resolution: Duplicate

Same issue as KAFKA-9747 - that one already has a fix under review

> Kafka Connect losing task (re)configuration when connector name has special 
> characters
> --
>
> Key: KAFKA-13253
> URL: https://issues.apache.org/jira/browse/KAFKA-13253
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.7.1
>Reporter: David Dufour
>Priority: Major
>
> When not leader, DistributedHerder.reconfigureConnector() forwards the task 
> configuration to the leader as follow:
> {quote}String reconfigUrl = RestServer.urlJoin(leaderUrl, "/connectors/" + 
> connName + "/tasks");
> log.trace("Forwarding task configurations for connector {} to leader", 
> connName);
> RestClient.httpRequest(reconfigUrl, "POST", null, rawTaskProps, null, config, 
> sessionKey, requestSignatureAlgorithm);
> {quote}
> The problem is that if the connector name contains some special characters, 
> such as '<', '>'... they need to be 'URLEncoded' appropriately, otherwise an 
> uncatched exception is raised in RestClient and the forward is lost.
> Here is the kind of exception we can catch by adding the necessary code in 
> RestClient:
> {quote}java.lang.IllegalArgumentException: Illegal character in path at index 
> 51: 
> [http://10.224.0.15:8083/connectors/mirror1-cluster-]>mirror2-cluster.MirrorHeartbeatConnector/tasks
> {quote}
> An additional catch() should be added in RestClient.httpRequest(), here:
> {quote}{{catch (IOException | InterruptedException | TimeoutException | 
> ExecutionException e) {}}
>     log.error("IO error forwarding REST request: ", e);
>     {{throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR, 
> "IO Error trying to forward REST request: " + e.getMessage(), e);}}
> {{} finally {}}
> {quote}
> to catch all other exceptions because without, this kind of problem is 
> completly silent.
> To reproduce:
>  * start 2 kafka clusters
>  * start a kafka connect (distributed) with at least 2 nodes
>  * start an HeartbeatConnector with name "cluster1->cluster2"
> If the node which generated the task is not the leader (not systematic), it 
> will forward the creation to the leader and it will be lost. As a result, the 
> connector will stay in RUNNING state but without any task.
> Problem not easy to reproduce, it is important to start with empty connect 
> topics to reproduce more easily



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-12664) Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 clusters in standalone mode

2021-04-14 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12664?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-12664.
--
Resolution: Not A Bug

> Mirrormaker 2.0 infinite rebalance loop when dealing with more than 2 
> clusters in standalone mode
> -
>
> Key: KAFKA-12664
> URL: https://issues.apache.org/jira/browse/KAFKA-12664
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0, 2.4.1, 2.6.0, 2.7.0
>Reporter: Edward Vaisman
>Assignee: Daniel Urban
>Priority: Major
> Attachments: connect.log.tar.gz, docker-compose-multi.yml, 
> mm2.properties
>
>
> Hi Folks, I came across this issue when trying to aggregate data from two 
> separate data centres into one data centre.
> In the configuration below, you can see I am trying to replicate a topic from 
> dc1 (named test_topic_dc1) to dc3 as well as replicate a topic from dc2 
> (test_topic_dc2) to dc3.
> However, when I try to replicate both topics from those datacenters at the 
> same time I notice that connect gets stuck in a rebalance loop (see 
> attachment for logs)
>  [^connect.log.tar.gz]
> excerpt of connect.log
> {code:java}
> 2021-04-13 17:03:06,360] INFO [Worker clientId=connect-3, groupId=mm2-dc2] 
> Successfully synced group in generation Generation{generationId=347, 
> memberId='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> protocol='sessioned'} 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:756)[2021-04-13
>  17:03:06,360] INFO [Worker clientId=connect-4, groupId=mm2-dc2] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,362] INFO [Worker clientId=connect-4, groupId=mm2-dc2] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,368] INFO [Worker clientId=connect-2, groupId=mm2-dc3] Rebalance 
> started 
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:225)[2021-04-13
>  17:03:06,369] INFO [Worker clientId=connect-2, groupId=mm2-dc3] (Re-)joining 
> group 
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:540)[2021-04-13
>  17:03:06,370] INFO [Worker clientId=connect-3, groupId=mm2-dc2] Joined group 
> at generation 347 with protocol version 2 and got assignment: 
> Assignment{error=1, leader='connect-3-c59342c3-ca62-41cc-964c-41a0f98351c0', 
> leaderUrl='NOTUSED/dc1', offset=13, connectorIds=[MirrorSourceConnector], 
> taskIds=[], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with 
> rebalance delay: 0 
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:1688)
> {code}
> To replicate the issue here is what I used:
> [^mm2.properties]
> {code:java}
> clusters = dc1, dc2, dc3
> dc1.bootstrap.servers = kafka-dc1:19092
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc1.group.id=mm2-dc1
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> dc1->dc3.enabled = true
> dc1->dc3.topics = test_topic_dc1
> dc2->dc3.enabled = true
> dc2->dc3.topics = test_topic_dc2
> dc3->dc2 = falsedc3->dc1 = false
> {code}
> This [^docker-compose-multi.yml] file to create local kafka clusters 
> (dc1,dc2,dc3)
>  (I set docker to use 6 cpus, 8gb mem, swap 2gb)
> I then ran an interactive shell to run mirror maker within the same 
> docker-compose network (change network to match yours)
> {code:java}
> docker run --network kafka-examples_default -it wurstmeister/kafka:latest bash
> # Upload mm2 properties on server
> /opt/kafka/bin/connect-mirror-maker.sh mm2.properties{code}
> Kafkacat commands to produce to dc1, dc2
> {code:java}
> kafkacat -b localhost:9092 -t test_topic_dc1 -P
> Hello World from DC1!{code}
> {code:java}
> kafkacat -b localhost:9093 -t test_topic_dc2 -P
> Hello World from DC2{code}
> I then tried to remove one of the datacenters to confirm if it was a 
> configuration problem, however mirror maker ran successfully with the below 
> configuration
> mm2.properties
> {code:java}
> clusters = dc2, dc3
> dc2.bootstrap.servers = kafka-dc2:19093
> dc3.bootstrap.servers = kafka-dc3:19094
> dc2.group.id=mm2-dc2
> dc3.group.id=mm2-dc3
> replication.factor=1
> checkpoints.topic.replication.factor=1
> heartbeats.topic.replication.factor=1
> offset-syncs.topic.replication.factor=1
> offset.storage.replication.factor=1
> status.storage.replication.factor=1
> config.storage.replication.factor=1
> dc2->dc3.enabled = true
> dc2->dc3.topics = test_topic_dc2
> {code

[jira] [Created] (KAFKA-10586) MirrorMaker 2.0 REST support

2020-10-08 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-10586:


 Summary: MirrorMaker 2.0 REST support
 Key: KAFKA-10586
 URL: https://issues.apache.org/jira/browse/KAFKA-10586
 Project: Kafka
  Issue Type: Improvement
  Components: mirrormaker
Reporter: Daniel Urban
Assignee: Daniel Urban


KIP-382 introduced MirrorMaker 2.0. Because of scoping issues, the dedicated 
MirrorMaker 2.0 cluster does not utilize the Connect REST API. This means that 
with specific workloads, the dedicated MM2 cluster can become unable to react 
to dynamic topic and group filter changes.

(This occurs when after a rebalance operation, the leader node has no 
MirrorSourceConnectorTasks. Because of this, the MirrorSourceConnector is 
stopped on the leader, meaning it cannot detect config changes by itself. 
Followers still running the connector can detect config changes, but they 
cannot query the leader for config updates.)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337

2020-08-24 Thread Daniel Urban (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniel Urban resolved KAFKA-10414.
--
Resolution: Not A Problem

api-util is only a test dependency, not an issue.

> Upgrade api-util dependency - CVE-2018-1337
> ---
>
> Key: KAFKA-10414
> URL: https://issues.apache.org/jira/browse/KAFKA-10414
> Project: Kafka
>  Issue Type: Bug
>Reporter: Daniel Urban
>Assignee: Daniel Urban
>Priority: Major
>
> There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
> involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=
> This is a transitive dependency through the apacheds libs.
> -Can be fixed by upgrading to at least version 2.0.0.AM25-
> Since api-all is also a dependency, and there is a class collision between 
> api-all and newer version of api-util, it is better to just upgrade api-util 
> to 1.0.2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10414) Upgrade api-util dependency - CVE-2018-1337

2020-08-18 Thread Daniel Urban (Jira)
Daniel Urban created KAFKA-10414:


 Summary: Upgrade api-util dependency - CVE-2018-1337
 Key: KAFKA-10414
 URL: https://issues.apache.org/jira/browse/KAFKA-10414
 Project: Kafka
  Issue Type: Bug
Reporter: Daniel Urban


There is a dependency on org.apache.directory.api:api-util:1.0.0, which is 
involved in CVE-2018-1337. The issue is fixed in api-util:1.0.2<=

This is a transitive dependency through the apacheds libs. Can be fixed by 
upgrading to at least version 2.0.0.AM25



--
This message was sent by Atlassian Jira
(v8.3.4#803005)