[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql

2023-08-11 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-44774:
---
Description: 
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'no'
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append").save()

# no exception is raised
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("error").save()
{code}

4. Check topic content - 2 rows are added to topic instead of one:
{code:python}
spark.read.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("subscribe", "new_topic").load().show(10, False)
{code}
{code}
++---+-+-+--+---+-+
|key |value  |topic|partition|offset|timestamp  
|timestampType|
++---+-+-+--+---+-+
|null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0  
  |
|null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0  
  |
++---+-+-+--+---+-+
{code}

It looks like mode is checked by KafkaSourceProvider, but is not used at all:
https://github.com/apache/spark/blob/v3.4.1/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178

So data is always appended to topic.

  was:
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'no'
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append").save()

# no exception is raised

[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql

2023-08-11 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-44774:
---
Description: 
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'yes'
  KAFKA_CLIENT_USERS: onetl
  KAFKA_CLIENT_PASSWORDS: uufoFae9sahSoidoo0eagaidaoreif6z
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append").save()

# no exception is raised
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("error").save()
{code}

4. Check topic content - 2 rows are added to topic instead of one:
{code:python}
spark.read.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("subscribe", "new_topic").load().show(10, False)
{code}
{code}
++---+-+-+--+---+-+
|key |value  |topic|partition|offset|timestamp  
|timestampType|
++---+-+-+--+---+-+
|null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0  
  |
|null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0  
  |
++---+-+-+--+---+-+
{code}

It looks like mode is checked by KafkaSourceProvider, but is not used at all:
https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178

So data is always appended to topic.

  was:
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception - instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'yes'
  KAFKA_CLIENT_USERS: onetl
  KAFKA_CLIENT_PASSWORDS: uufoFae9sahSoidoo0eagaidaoreif6z
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic 

[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql

2023-08-11 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-44774:
---
Description: 
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'yes'
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append").save()

# no exception is raised
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("error").save()
{code}

4. Check topic content - 2 rows are added to topic instead of one:
{code:python}
spark.read.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("subscribe", "new_topic").load().show(10, False)
{code}
{code}
++---+-+-+--+---+-+
|key |value  |topic|partition|offset|timestamp  
|timestampType|
++---+-+-+--+---+-+
|null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0  
  |
|null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0  
  |
++---+-+-+--+---+-+
{code}

It looks like mode is checked by KafkaSourceProvider, but is not used at all:
https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178

So data is always appended to topic.

  was:
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'yes'
  KAFKA_CLIENT_USERS: onetl
  KAFKA_CLIENT_PASSWORDS: uufoFae9sahSoidoo0eagaidaoreif6z
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka"

[jira] [Updated] (SPARK-44774) SaveMode.ErrorIfExists does not work with kafka-sql

2023-08-11 Thread Maxim Martynov (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-44774?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Maxim Martynov updated SPARK-44774:
---
Description: 
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'no'
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append").save()

# no exception is raised
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("error").save()
{code}

4. Check topic content - 2 rows are added to topic instead of one:
{code:python}
spark.read.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("subscribe", "new_topic").load().show(10, False)
{code}
{code}
++---+-+-+--+---+-+
|key |value  |topic|partition|offset|timestamp  
|timestampType|
++---+-+-+--+---+-+
|null|[73 74 72 69 6E 67]|new_topic|0|0 |2023-08-11 09:39:35.813|0  
  |
|null|[73 74 72 69 6E 67]|new_topic|0|1 |2023-08-11 09:39:36.122|0  
  |
++---+-+-+--+---+-+
{code}

It looks like mode is checked by KafkaSourceProvider, but is not used at all:
https://github.com/apache/spark/blob/6b1ff22dde1ead51cbf370be6e48a802daae58b6/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala#L172-L178

So data is always appended to topic.

  was:
I' trying to write batch dataframe to Kafka topic with {{mode="error"}}, but 
when topic exists it does not raise exception. Instead it appends data to a 
topic.

Steps to reproduce:

1. Start Kafka:

docker-compose.yml
{code:yaml}
version: '3.9'

services:
  zookeeper:
image: bitnami/zookeeper:3.8
environment:
  ALLOW_ANONYMOUS_LOGIN: 'yes'

  kafka:
image: bitnami/kafka:latest
restart: unless-stopped
ports:
- 9093:9093
environment:
  ALLOW_PLAINTEXT_LISTENER: 'yes'
  KAFKA_ENABLE_KRAFT: 'yes'
  KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL_PLAINTEXT_ANONYMOUS
  KAFKA_CFG_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://:9093
  KAFKA_CFG_ADVERTISED_LISTENERS: 
INTERNAL_PLAINTEXT_ANONYMOUS://kafka:9092,EXTERNAL_PLAINTEXT_ANONYMOUS://localhost:9093
  KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: 
INTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT,EXTERNAL_PLAINTEXT_ANONYMOUS:PLAINTEXT
  KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true'
depends_on:
- zookeeper
{code}

{code:bash}
docker-compose up -d
{code}

2. Start Spark session:

{code:bash}
pip install pyspark[sql]==3.4.1
{code}


{code:python}
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.jars.packages", 
"org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.1").getOrCreate()
{code}

3. Create DataFrame and write it to Kafka. First write using {{mode="append"}} 
to create topic, then with {{mode="error"}} to raise because topic already 
exist:
{code}
df = spark.createDataFrame([{"value": "string"}])
df.write.format("kafka").option("kafka.bootstrap.servers", 
"localhost:9093").option("topic", "new_topic").mode("append