[ 
https://issues.apache.org/jira/browse/KAFKA-19787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhuming updated KAFKA-19787:
----------------------------
    Description: 
*  kafka server version is 2.5.1
 *  kafka-client version bigger than 3.1.1 

 
{code:java}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

import static com.mediav.utils.ThriftUtils.decodeB64IfNeeded;

public class KafkaSendTest {
  public static String[] logs = {
          "CgBlADDccNG4nuMMAGYLAAEAAAAXNjUyMjg",
  };
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "xxx:9092,xxxx:9092");
    props.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put("transaction.timeout.ms", "300000");
    props.put("acks", "-1"); // If acks=1 or acks=0 it will send successfully
    props.put("compression.type", "lz4");
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "SCRAM-SHA-256");
    props.put("sasl.jaas.config", 
"org.apache.kafka.common.security.scram.ScramLoginModule required 
username=\"xxx\" password=\"xxxx\";");

    Producer<byte[], byte[]> producer = new KafkaProducer<>(props);

    try {
      String topic = "testdsp.charge.6.cpmshow";
      byte[] value = decodeB64IfNeeded(logs[0]);
      ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, 
value);
      producer.send(record, (metadata, exception) -> {
        if (exception == null) {
          System.out.printf("Sent record(key=%s value=%s) meta(partition=%d, 
offset=%d)\n",
                  record.key(), new String(record.value()), 
metadata.partition(), metadata.offset());
        } else {
          exception.printStackTrace();
        }
      });
      producer.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
} {code}
pom.xml config

 
{code:java}
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.4.0</version>
</dependency> {code}
         When kafka producer acks=-1, It will throw exception.

 
{code:java}
org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state    at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
      at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:328)
        at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1061) 
     at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) at 
com.mvad.realtime.show.converter.DataEntryToRTLogConverterTest.main(DataEntryToRTLogConverterTest.java:34)Caused
 by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster 
authorization failed. {code}
          If acks=1 or acks=0 it will send successfully
{code:java}
Sent record(key=null ) meta(partition=6, offset=321496) {code}
    acks=-1 is just a param, How it effects ClusterAuthorization of kafka 
producer.
    Is this a bug or a mechanism in itself?
 
 

  was:
*  kafka server version is 2.5.1
 *  kafka-client version bigger than 3.1.1 

 
{code:java}
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

import static com.mediav.utils.ThriftUtils.decodeB64IfNeeded;

public class KafkaSendTest {
  public static String[] logs = {
          "CgBlADDccNG4nuMMAGYLAAEAAAAXNjUyMjg",
  };
  public static void main(String[] args) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "xxx9092,xxxx:9092");
    props.put("key.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put("value.serializer", 
"org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put("transaction.timeout.ms", "300000");
    props.put("acks", "-1"); // If acks=1 or acks=0 it will send successfully
    props.put("compression.type", "lz4");
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.mechanism", "SCRAM-SHA-256");
    props.put("sasl.jaas.config", 
"org.apache.kafka.common.security.scram.ScramLoginModule required 
username=\"casino-xxx\" password=\"xxxx\";");

    Producer<byte[], byte[]> producer = new KafkaProducer<>(props);

    try {
      String topic = "testdsp.charge.6.cpmshow";
      byte[] value = decodeB64IfNeeded(logs[0]);
      ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, 
value);
      producer.send(record, (metadata, exception) -> {
        if (exception == null) {
          System.out.printf("Sent record(key=%s value=%s) meta(partition=%d, 
offset=%d)\n",
                  record.key(), new String(record.value()), 
metadata.partition(), metadata.offset());
        } else {
          exception.printStackTrace();
        }
      });
      producer.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
} {code}
pom.xml config

 
{code:java}
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>3.4.0</version>
</dependency> {code}
         When kafka producer acks=-1, It will throw exception.

 
{code:java}
org.apache.kafka.common.KafkaException: Cannot execute transactional method 
because we are in an error state    at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
      at 
org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:328)
        at 
org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1061) 
     at 
org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) at 
com.mvad.realtime.show.converter.DataEntryToRTLogConverterTest.main(DataEntryToRTLogConverterTest.java:34)Caused
 by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster 
authorization failed. {code}
          If acks=1 or acks=0 it will send successfully
{code:java}
Sent record(key=null ) meta(partition=6, offset=321496) {code}
    acks=-1 is just a param, How it effects ClusterAuthorization of kafka 
producer.
    Is this a bug or a mechanism in itself?
 
 


> kafka server ClusterAuthorization don't support acks=-1 for kafka-client 
> version>3.1.0
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19787
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19787
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.1.1
>            Reporter: zhuming
>            Priority: Major
>
> *  kafka server version is 2.5.1
>  *  kafka-client version bigger than 3.1.1 
>  
> {code:java}
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import java.util.Properties;
> import static com.mediav.utils.ThriftUtils.decodeB64IfNeeded;
> public class KafkaSendTest {
>   public static String[] logs = {
>           "CgBlADDccNG4nuMMAGYLAAEAAAAXNjUyMjg",
>   };
>   public static void main(String[] args) {
>     Properties props = new Properties();
>     props.put("bootstrap.servers", "xxx:9092,xxxx:9092");
>     props.put("key.serializer", 
> "org.apache.kafka.common.serialization.ByteArraySerializer");
>     props.put("value.serializer", 
> "org.apache.kafka.common.serialization.ByteArraySerializer");
>     props.put("transaction.timeout.ms", "300000");
>     props.put("acks", "-1"); // If acks=1 or acks=0 it will send successfully
>     props.put("compression.type", "lz4");
>     props.put("security.protocol", "SASL_PLAINTEXT");
>     props.put("sasl.mechanism", "SCRAM-SHA-256");
>     props.put("sasl.jaas.config", 
> "org.apache.kafka.common.security.scram.ScramLoginModule required 
> username=\"xxx\" password=\"xxxx\";");
>     Producer<byte[], byte[]> producer = new KafkaProducer<>(props);
>     try {
>       String topic = "testdsp.charge.6.cpmshow";
>       byte[] value = decodeB64IfNeeded(logs[0]);
>       ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, 
> null, value);
>       producer.send(record, (metadata, exception) -> {
>         if (exception == null) {
>           System.out.printf("Sent record(key=%s value=%s) meta(partition=%d, 
> offset=%d)\n",
>                   record.key(), new String(record.value()), 
> metadata.partition(), metadata.offset());
>         } else {
>           exception.printStackTrace();
>         }
>       });
>       producer.close();
>     } catch (Exception e) {
>       e.printStackTrace();
>     }
>   }
> } {code}
> pom.xml config
>  
> {code:java}
> <dependency>
>   <groupId>org.apache.kafka</groupId>
>   <artifactId>kafka-clients</artifactId>
>   <version>3.4.0</version>
> </dependency> {code}
>          When kafka producer acks=-1, It will throw exception.
>  
> {code:java}
> org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error state  at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
>       at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:328)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1061)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) 
> at 
> com.mvad.realtime.show.converter.DataEntryToRTLogConverterTest.main(DataEntryToRTLogConverterTest.java:34)Caused
>  by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster 
> authorization failed. {code}
>           If acks=1 or acks=0 it will send successfully
> {code:java}
> Sent record(key=null ) meta(partition=6, offset=321496) {code}
>     acks=-1 is just a param, How it effects ClusterAuthorization of kafka 
> producer.
>     Is this a bug or a mechanism in itself?
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to