[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql
[ https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maxim Martynov updated SPARK-44774: --- Description: I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but when topic exists it does not raise exception. Instead it appends data to a topic. Steps to reproduce: 1. Start Kafka: docker-compose.yml {code:yaml} version: '3.9' services: zookeeper: image: bitnami/zookeeper:3.8 environment: ALLOW_ANONYMOUS_LOGIN: 'yes' kafka: image: bitnami/kafka:latest restart: unless-stopped ports: - 9093:9093 environment: ALLOW_PLAINTEXT_LISTENER: 'yes' KAFKA_ENABLE_KRAFT: 'no' KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS KAFKA_CFG_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093 KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' depends_on: - zookeeper {code} {code:bash} docker-compose up -d {code} 2. Start Spark session: {code:bash} pip install pyspark[sql]==3.4.1 {code} {code:python} from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate() {code} 3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} to create topic, then with {{mode="error"}} to raise because topic already exist: {code} df = spark.createDataFrame([{"value": "string"}]) df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("topic", "new_topic").mode("append").save() # no exception is raised df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("topic", "new_topic").mode("error").save() {code} 4. Check topic content - 2 rows are added to topic instead of one: {code:python} spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("subscribe", "new_topic").load().show(10, False) {code} {code} ++---+-+-+--+---+-+ |key |value |topic|partition|offset|timestamp |timestampType| ++---+-+-+--+---+-+ |null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0 | |null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0 | ++---+-+-+--+---+-+ {code} It looks like mode is checked by KafkaSourceProvider, but is not used at all: https://github.com/apache/spark/blob/v3.4.1/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178 So data is always appended to topic. was: I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but when topic exists it does not raise exception. Instead it appends data to a topic. Steps to reproduce: 1. Start Kafka: docker-compose.yml {code:yaml} version: '3.9' services: zookeeper: image: bitnami/zookeeper:3.8 environment: ALLOW_ANONYMOUS_LOGIN: 'yes' kafka: image: bitnami/kafka:latest restart: unless-stopped ports: - 9093:9093 environment: ALLOW_PLAINTEXT_LISTENER: 'yes' KAFKA_ENABLE_KRAFT: 'no' KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS KAFKA_CFG_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093 KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' depends_on: - zookeeper {code} {code:bash} docker-compose up -d {code} 2. Start Spark session: {code:bash} pip install pyspark[sql]==3.4.1 {code} {code:python} from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate() {code} 3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} to create topic, then with {{mode="error"}} to raise because topic already exist: {code} df = spark.createDataFrame([{"value": "string"}]) df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("topic", "new_topic").mode("append").save() # no exception is raised
[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql
[ https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maxim Martynov updated SPARK-44774: --- Description: I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but when topic exists it does not raise exception. Instead it appends data to a topic. Steps to reproduce: 1. Start Kafka: docker-compose.yml {code:yaml} version: '3.9' services: zookeeper: image: bitnami/zookeeper:3.8 environment: ALLOW_ANONYMOUS_LOGIN: 'yes' kafka: image: bitnami/kafka:latest restart: unless-stopped ports: - 9093:9093 environment: ALLOW_PLAINTEXT_LISTENER: 'yes' KAFKA_ENABLE_KRAFT: 'yes' KAFKA_CLIENT_USERS: onetl KAFKA_CLIENT_PASSWORDS: uufoFae9sahSoidoo0eagaidaoreif6z KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS KAFKA_CFG_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093 KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' depends_on: - zookeeper {code} {code:bash} docker-compose up -d {code} 2. Start Spark session: {code:bash} pip install pyspark[sql]==3.4.1 {code} {code:python} from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate() {code} 3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} to create topic, then with {{mode="error"}} to raise because topic already exist: {code} df = spark.createDataFrame([{"value": "string"}]) df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("topic", "new_topic").mode("append").save() # no exception is raised df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("topic", "new_topic").mode("error").save() {code} 4. Check topic content - 2 rows are added to topic instead of one: {code:python} spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("subscribe", "new_topic").load().show(10, False) {code} {code} ++---+-+-+--+---+-+ |key |value |topic|partition|offset|timestamp |timestampType| ++---+-+-+--+---+-+ |null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0 | |null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0 | ++---+-+-+--+---+-+ {code} It looks like mode is checked by KafkaSourceProvider, but is not used at all: https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178 So data is always appended to topic. was: I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but when topic exists it does not raise exception - instead it appends data to a topic. Steps to reproduce: 1. Start Kafka: docker-compose.yml {code:yaml} version: '3.9' services: zookeeper: image: bitnami/zookeeper:3.8 environment: ALLOW_ANONYMOUS_LOGIN: 'yes' kafka: image: bitnami/kafka:latest restart: unless-stopped ports: - 9093:9093 environment: ALLOW_PLAINTEXT_LISTENER: 'yes' KAFKA_ENABLE_KRAFT: 'yes' KAFKA_CLIENT_USERS: onetl KAFKA_CLIENT_PASSWORDS: uufoFae9sahSoidoo0eagaidaoreif6z KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS KAFKA_CFG_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093 KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' depends_on: - zookeeper {code} {code:bash} docker-compose up -d {code} 2. Start Spark session: {code:bash} pip install pyspark[sql]==3.4.1 {code} {code:python} from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate() {code} 3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} to create topic, then with {{mode="error"}} to raise because topic
[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql
[ https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maxim Martynov updated SPARK-44774: --- Description: I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but when topic exists it does not raise exception. Instead it appends data to a topic. Steps to reproduce: 1. Start Kafka: docker-compose.yml {code:yaml} version: '3.9' services: zookeeper: image: bitnami/zookeeper:3.8 environment: ALLOW_ANONYMOUS_LOGIN: 'yes' kafka: image: bitnami/kafka:latest restart: unless-stopped ports: - 9093:9093 environment: ALLOW_PLAINTEXT_LISTENER: 'yes' KAFKA_ENABLE_KRAFT: 'yes' KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS KAFKA_CFG_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093 KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' depends_on: - zookeeper {code} {code:bash} docker-compose up -d {code} 2. Start Spark session: {code:bash} pip install pyspark[sql]==3.4.1 {code} {code:python} from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate() {code} 3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} to create topic, then with {{mode="error"}} to raise because topic already exist: {code} df = spark.createDataFrame([{"value": "string"}]) df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("topic", "new_topic").mode("append").save() # no exception is raised df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("topic", "new_topic").mode("error").save() {code} 4. Check topic content - 2 rows are added to topic instead of one: {code:python} spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("subscribe", "new_topic").load().show(10, False) {code} {code} ++---+-+-+--+---+-+ |key |value |topic|partition|offset|timestamp |timestampType| ++---+-+-+--+---+-+ |null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0 | |null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0 | ++---+-+-+--+---+-+ {code} It looks like mode is checked by KafkaSourceProvider, but is not used at all: https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178 So data is always appended to topic. was: I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but when topic exists it does not raise exception. Instead it appends data to a topic. Steps to reproduce: 1. Start Kafka: docker-compose.yml {code:yaml} version: '3.9' services: zookeeper: image: bitnami/zookeeper:3.8 environment: ALLOW_ANONYMOUS_LOGIN: 'yes' kafka: image: bitnami/kafka:latest restart: unless-stopped ports: - 9093:9093 environment: ALLOW_PLAINTEXT_LISTENER: 'yes' KAFKA_ENABLE_KRAFT: 'yes' KAFKA_CLIENT_USERS: onetl KAFKA_CLIENT_PASSWORDS: uufoFae9sahSoidoo0eagaidaoreif6z KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS KAFKA_CFG_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093 KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' depends_on: - zookeeper {code} {code:bash} docker-compose up -d {code} 2. Start Spark session: {code:bash} pip install pyspark[sql]==3.4.1 {code} {code:python} from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate() {code} 3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} to create topic, then with {{mode="error"}} to raise because topic already exist: {code} df = spark.createDataFrame([{"value": "string"}]) df.write.format("kafka"
[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql
[ https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maxim Martynov updated SPARK-44774: --- Description: I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but when topic exists it does not raise exception. Instead it appends data to a topic. Steps to reproduce: 1. Start Kafka: docker-compose.yml {code:yaml} version: '3.9' services: zookeeper: image: bitnami/zookeeper:3.8 environment: ALLOW_ANONYMOUS_LOGIN: 'yes' kafka: image: bitnami/kafka:latest restart: unless-stopped ports: - 9093:9093 environment: ALLOW_PLAINTEXT_LISTENER: 'yes' KAFKA_ENABLE_KRAFT: 'no' KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS KAFKA_CFG_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093 KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' depends_on: - zookeeper {code} {code:bash} docker-compose up -d {code} 2. Start Spark session: {code:bash} pip install pyspark[sql]==3.4.1 {code} {code:python} from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate() {code} 3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} to create topic, then with {{mode="error"}} to raise because topic already exist: {code} df = spark.createDataFrame([{"value": "string"}]) df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("topic", "new_topic").mode("append").save() # no exception is raised df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("topic", "new_topic").mode("error").save() {code} 4. Check topic content - 2 rows are added to topic instead of one: {code:python} spark.read.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("subscribe", "new_topic").load().show(10, False) {code} {code} ++---+-+-+--+---+-+ |key |value |topic|partition|offset|timestamp |timestampType| ++---+-+-+--+---+-+ |null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0 | |null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0 | ++---+-+-+--+---+-+ {code} It looks like mode is checked by KafkaSourceProvider, but is not used at all: https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178 So data is always appended to topic. was: I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but when topic exists it does not raise exception. Instead it appends data to a topic. Steps to reproduce: 1. Start Kafka: docker-compose.yml {code:yaml} version: '3.9' services: zookeeper: image: bitnami/zookeeper:3.8 environment: ALLOW_ANONYMOUS_LOGIN: 'yes' kafka: image: bitnami/kafka:latest restart: unless-stopped ports: - 9093:9093 environment: ALLOW_PLAINTEXT_LISTENER: 'yes' KAFKA_ENABLE_KRAFT: 'yes' KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS KAFKA_CFG_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093 KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093 KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' depends_on: - zookeeper {code} {code:bash} docker-compose up -d {code} 2. Start Spark session: {code:bash} pip install pyspark[sql]==3.4.1 {code} {code:python} from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate() {code} 3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} to create topic, then with {{mode="error"}} to raise because topic already exist: {code} df = spark.createDataFrame([{"value": "string"}]) df.write.format("kafka").option("kafka.bootstrap.servers", "localhost:9093").option("topic", "new_topic").mode("append