[
https://issues.apache.org/jira/browse/KAFKA-19787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
zhuming updated KAFKA-19787:
----------------------------
Description:
* kafka server version is 2.5.1
* kafka-client version bigger than 3.1.1
{code:java}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaSendTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "xxx:9092,xxxx:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("transaction.timeout.ms", "300000");
props.put("acks", "-1"); // If acks=1 or acks=0 it will send successfully
props.put("compression.type", "lz4");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required
username=\"xxx\" password=\"xxxx\";");
Producer<byte[], byte[]> producer = new KafkaProducer<>(props);
try {
String topic = "topic1";
byte[] value = new byte[]{1,2};
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null,
value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent record(key=%s value=%s) meta(partition=%d,
offset=%d)\n",
record.key(), new String(record.value()),
metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
} {code}
pom.xml config
{code:java}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency> {code}
When kafka producer acks=-1, It will throw exception.
{code:java}
org.apache.kafka.common.KafkaException: Cannot execute transactional method
because we are in an error state at
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
at
org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:328)
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1061)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) at
com.mvad.realtime.show.converter.DataEntryToRTLogConverterTest.main(DataEntryToRTLogConverterTest.java:34)Caused
by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster
authorization failed. {code}
If acks=1 or acks=0 it will send successfully
{code:java}
Sent record(key=null ) meta(partition=6, offset=321496) {code}
acks=-1 is just a param, How it effects ClusterAuthorization of kafka
producer.
Is this a bug or a mechanism in itself?
was:
* kafka server version is 2.5.1
* kafka-client version bigger than 3.1.1
{code:java}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import static com.mediav.utils.ThriftUtils.decodeB64IfNeeded;
public class KafkaSendTest {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "xxx:9092,xxxx:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("transaction.timeout.ms", "300000");
props.put("acks", "-1"); // If acks=1 or acks=0 it will send successfully
props.put("compression.type", "lz4");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required
username=\"xxx\" password=\"xxxx\";");
Producer<byte[], byte[]> producer = new KafkaProducer<>(props);
try {
String topic = "topic1";
byte[] value = new byte[]{1,2};
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null,
value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent record(key=%s value=%s) meta(partition=%d,
offset=%d)\n",
record.key(), new String(record.value()),
metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
} {code}
pom.xml config
{code:java}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency> {code}
When kafka producer acks=-1, It will throw exception.
{code:java}
org.apache.kafka.common.KafkaException: Cannot execute transactional method
because we are in an error state at
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
at
org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:328)
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1061)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) at
com.mvad.realtime.show.converter.DataEntryToRTLogConverterTest.main(DataEntryToRTLogConverterTest.java:34)Caused
by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster
authorization failed. {code}
If acks=1 or acks=0 it will send successfully
{code:java}
Sent record(key=null ) meta(partition=6, offset=321496) {code}
acks=-1 is just a param, How it effects ClusterAuthorization of kafka
producer.
Is this a bug or a mechanism in itself?
> kafka server ClusterAuthorization don't support acks=-1 for kafka-client
> version>3.1.0
> --------------------------------------------------------------------------------------
>
> Key: KAFKA-19787
> URL: https://issues.apache.org/jira/browse/KAFKA-19787
> Project: Kafka
> Issue Type: Bug
> Components: producer
> Affects Versions: 3.1.1
> Reporter: zhuming
> Priority: Major
>
> * kafka server version is 2.5.1
> * kafka-client version bigger than 3.1.1
>
> {code:java}
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import java.util.Properties;
> public class KafkaSendTest {
> public static void main(String[] args) {
> Properties props = new Properties();
> props.put("bootstrap.servers", "xxx:9092,xxxx:9092");
> props.put("key.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
> props.put("value.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
> props.put("transaction.timeout.ms", "300000");
> props.put("acks", "-1"); // If acks=1 or acks=0 it will send successfully
> props.put("compression.type", "lz4");
> props.put("security.protocol", "SASL_PLAINTEXT");
> props.put("sasl.mechanism", "SCRAM-SHA-256");
> props.put("sasl.jaas.config",
> "org.apache.kafka.common.security.scram.ScramLoginModule required
> username=\"xxx\" password=\"xxxx\";");
> Producer<byte[], byte[]> producer = new KafkaProducer<>(props);
> try {
> String topic = "topic1";
> byte[] value = new byte[]{1,2};
> ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic,
> null, value);
> producer.send(record, (metadata, exception) -> {
> if (exception == null) {
> System.out.printf("Sent record(key=%s value=%s) meta(partition=%d,
> offset=%d)\n",
> record.key(), new String(record.value()),
> metadata.partition(), metadata.offset());
> } else {
> exception.printStackTrace();
> }
> });
> producer.close();
> } catch (Exception e) {
> e.printStackTrace();
> }
> }
> } {code}
> pom.xml config
>
> {code:java}
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-clients</artifactId>
> <version>3.4.0</version>
> </dependency> {code}
> When kafka producer acks=-1, It will throw exception.
>
> {code:java}
> org.apache.kafka.common.KafkaException: Cannot execute transactional method
> because we are in an error state at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
> at
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:328)
> at
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1061)
> at
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962)
> at
> com.mvad.realtime.show.converter.DataEntryToRTLogConverterTest.main(DataEntryToRTLogConverterTest.java:34)Caused
> by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster
> authorization failed. {code}
> If acks=1 or acks=0 it will send successfully
> {code:java}
> Sent record(key=null ) meta(partition=6, offset=321496) {code}
> acks=-1 is just a param, How it effects ClusterAuthorization of kafka
> producer.
> Is this a bug or a mechanism in itself?
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)