zhuming created KAFKA-19787:
-------------------------------
Summary: kafka server ClusterAuthorization don't support acks=-1
for kafka-client version>3.1.0
Key: KAFKA-19787
URL: https://issues.apache.org/jira/browse/KAFKA-19787
Project: Kafka
Issue Type: Bug
Components: producer
Affects Versions: 3.1.1
Reporter: zhuming
* kafka server version is 2.5.1
* kafka-client version bigger than 3.1.1
{code:java}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import static com.mediav.utils.ThriftUtils.decodeB64IfNeeded;
public class KafkaSendTest {
public static String[] logs = {
"CgBlADDccNG4nuMMAGYLAAEAAAAXNjUyMjg",
};
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers",
"kafka.shrdt.adsys.shrdt.qihoo.net:9092,kafka.shrdt2.adsys.shrdt.qihoo.net:9092");
props.put("key.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("transaction.timeout.ms", "300000");
props.put("acks", "-1"); // If acks=1 or acks=0 it will send successfully
props.put("compression.type", "lz4");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required
username=\"casino-test\" password=\"%8BKD7k^tb+HMWe1B~$x\";");
Producer<byte[], byte[]> producer = new KafkaProducer<>(props);
try {
String topic = "testdsp.charge.6.cpmshow";
byte[] value = decodeB64IfNeeded(logs[0]);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null,
value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent record(key=%s value=%s) meta(partition=%d,
offset=%d)\n",
record.key(), new String(record.value()),
metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
} {code}
pom配置
{code:java}
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency> {code}
When kafka producer acks=-1, It will through throw exception.
{code:java}
org.apache.kafka.common.KafkaException: Cannot execute transactional method
because we are in an error state at
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
at
org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:328)
at
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1061)
at
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) at
com.mvad.realtime.show.converter.DataEntryToRTLogConverterTest.main(DataEntryToRTLogConverterTest.java:34)Caused
by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster
authorization failed. {code}
If acks=1 or acks=0 it will send successfully
{code:java}
Sent record(key=null ) meta(partition=6, offset=321496) {code}
acks=-1 is just a param, How it effects ClusterAuthorization of kafka
producer.
Is this a bug or a mechanism in itself?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)