zhuming created KAFKA-19787:
-------------------------------

             Summary: kafka server ClusterAuthorization don't support acks=-1 
for kafka-client version>3.1.0
                 Key: KAFKA-19787
                 URL: https://issues.apache.org/jira/browse/KAFKA-19787
             Project: Kafka
          Issue Type: Bug
          Components: producer 
    Affects Versions: 3.1.1
            Reporter: zhuming


*  kafka server version is 2.5.1
 *  kafka-client version bigger than 3.1.1 

 
{code:java}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

import static com.mediav.utils.ThriftUtils.decodeB64IfNeeded;

public class KafkaSendTest {
  public static String[] logs = {
          "CgBlADDccNG4nuMMAGYLAAEAAAAXNjUyMjg",
  };
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", 
"kafka.shrdt.adsys.shrdt.qihoo.net:9092,kafka.shrdt2.adsys.shrdt.qihoo.net:9092");
    props.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put("transaction.timeout.ms", "300000");
    props.put("acks", "-1"); // If acks=1 or acks=0 it will send successfully
    props.put("compression.type", "lz4");
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "SCRAM-SHA-256");
    props.put("sasl.jaas.config", 
"org.apache.kafka.common.security.scram.ScramLoginModule required 
username=\"casino-test\" password=\"%8BKD7k^tb+HMWe1B~$x\";");

    Producer<byte[], byte[]> producer = new KafkaProducer<>(props);

    try {
      String topic = "testdsp.charge.6.cpmshow";
      byte[] value = decodeB64IfNeeded(logs[0]);
      ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, 
value);
      producer.send(record, (metadata, exception) -> {
        if (exception == null) {
          System.out.printf("Sent record(key=%s value=%s) meta(partition=%d, 
offset=%d)\n",
                  record.key(), new String(record.value()), 
metadata.partition(), metadata.offset());
        } else {
          exception.printStackTrace();
        }
      });
      producer.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
} {code}
pom配置

 
{code:java}
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.4.0</version>
</dependency> {code}
         When kafka producer acks=-1, It will through throw exception.

 
{code:java}
org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state    at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
      at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:328)
        at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1061) 
     at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) at 
com.mvad.realtime.show.converter.DataEntryToRTLogConverterTest.main(DataEntryToRTLogConverterTest.java:34)Caused
 by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster 
authorization failed. {code}
          If acks=1 or acks=0 it will send successfully
{code:java}
Sent record(key=null ) meta(partition=6, offset=321496) {code}
    acks=-1 is just a param, How it effects ClusterAuthorization of kafka 
producer.
    Is this a bug or a mechanism in itself?
 
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to