[ 
https://issues.apache.org/jira/browse/SPARK-49499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rachel Bushrian updated SPARK-49499:
------------------------------------
    Description: 
 

I have a Spark Streaming job that reads data from a Delta table, performs 
transformations, and writes the data to Kafka in Avro format. However, in the 
event of a failure, Spark logs the logical plan, which inadvertently includes 
all Kafka properties, including sensitive mTLS keys.

The code:
{code:java}
val df = spark.readStream.format("delta")
      .option("ignoreCorruptFiles", "true")
      .option("path", fullPath)
      .schema(schema.get)
      .load()
val transformedDF = df.transform(...)  
 transformedDF.writeStream
      .format("kafka")
      .option("checkpointLocation", cpPath)
      .options(config.conf)
      .trigger(trigger.getOrElse(Trigger.ProcessingTime(0L)))
      .start() 
 df.sparkSession.streams.awaitAnyTermination(){code}
The exception:
{code:java}
Exception in thread "main" 
org.apache.spark.sql.streaming.StreamingQueryException: 
org.apache.hadoop.fs.s3a.RemoteFileChangedException: open 
s3a://.../_delta_log/00000000000001258632.json at 0 on 
s3a://.../_delta_log/00000000000001258632.json: 
software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status 
Code: 412, Request ID: 
tx0000012a53fe47376ba53-0066cf6372-149bb8-default):PreconditionFailed 
`s3a://.../_delta_log/00000000000001258632.json': : null (Service: S3, Status 
Code: 412, Request ID: tx0000012a53fe47376ba53-0066cf6372-149bb8-default) === 
Streaming Query === Identifier: [id = 0d9cd948-f417-4688-8c02-355a772299a4, 
runId = f420b516-3e25-4443-8ae3-9f511b37ff8c] Current Committed Offsets: 
{DeltaSource[s3a://...]: 
{"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}
 Current Available Offsets: {DeltaSource[s3a://...]: 
{"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}Current
 State: ACTIVE Thread State: RUNNABLE{code}
{code:java}
Logical Plan: WriteToMicroBatchDataSource 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7945310f, 
0d9cd948-f417-4688-8c02-355a772299a4, [kafka.ssl.keystore.type=PEM, 
subscribe=dns-correlated-events, kafka.ssl.truststore.certificates=-----BEGIN 
CERTIFICATE----- XXXX -----END CERTIFICATE----- , 
kafka.max.request.size=2000000, 
kafka.bootstrap.servers=kafkacluster-kafka-bootstrap.kafka.svc.cluster.local:9094,
 kafka.ssl.keystore.key=-----BEGIN PRIVATE KEY----- XXXX -----END PRIVATE 
KEY----- , checkpointLocation=s3a://…, topic=dns-correlated-events, 
kafka.ssl.truststore.type=PEM, kafka.ssl.keystore.certificate.chain=-----BEGIN 
CERTIFICATE----- XXXX -----END CERTIFICATE----- , kafka.security.protocol=SSL], 
Append +- Project [to_aero(struct(ts, ts#744L, ...
{code}
 

  was:
 

I have a Spark Streaming job that reads data from a Delta table, performs 
transformations, and writes the data to Kafka in Avro format. However, in the 
event of a failure, Spark logs the logical plan, which inadvertently includes 
all Kafka properties, including sensitive mTLS keys.

The code:
{code:java}
val df = spark.readStream.format("delta")
      .option("ignoreCorruptFiles", "true")
      .option("path", fullPath)
      .schema(schema.get)
      .load()
val transformedDF = df.transform(...)  
 transformedDF.writeStream
      .format("kafka")
      .option("checkpointLocation", cpPath)
      .options(config.conf)
      .trigger(trigger.getOrElse(Trigger.ProcessingTime(0L)))
      .start() 
 df.sparkSession.streams.awaitAnyTermination(){code}
The exception:
{code:java}
Exception in thread "main" 
org.apache.spark.sql.streaming.StreamingQueryException: 
org.apache.hadoop.fs.s3a.RemoteFileChangedException: open 
s3a://.../_delta_log/00000000000001258632.json at 0 on 
s3a://.../_delta_log/00000000000001258632.json: 
software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status 
Code: 412, Request ID: 
tx0000012a53fe47376ba53-0066cf6372-149bb8-default):PreconditionFailed 
`s3a://.../_delta_log/00000000000001258632.json': : null (Service: S3, Status 
Code: 412, Request ID: tx0000012a53fe47376ba53-0066cf6372-149bb8-default) === 
Streaming Query === Identifier: [id = 0d9cd948-f417-4688-8c02-355a772299a4, 
runId = f420b516-3e25-4443-8ae3-9f511b37ff8c] Current Committed Offsets: 
{DeltaSource[s3a://...]: 
{"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}
 Current Available Offsets: {DeltaSource[s3a://...]: 
{"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}Current
 State: ACTIVE Thread State: RUNNABLELogical Plan: WriteToMicroBatchDataSource 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7945310f, 
0d9cd948-f417-4688-8c02-355a772299a4, [kafka.ssl.keystore.type=PEM, 
subscribe=dns-correlated-events, kafka.ssl.truststore.certificates=-----BEGIN 
CERTIFICATE----- XXXX -----END CERTIFICATE----- , 
kafka.max.request.size=2000000, 
kafka.bootstrap.servers=kafkacluster-kafka-bootstrap.kafka.svc.cluster.local:9094,
 kafka.ssl.keystore.key=-----BEGIN PRIVATE KEY----- XXXX -----END PRIVATE 
KEY----- , checkpointLocation=s3a://…, topic=dns-correlated-events, 
kafka.ssl.truststore.type=PEM, kafka.ssl.keystore.certificate.chain=-----BEGIN 
CERTIFICATE----- XXXX -----END CERTIFICATE----- , kafka.security.protocol=SSL], 
Append +- Project [to_aero(struct(ts, ts#744L, ...
{code}
 


> Spark Streaming Exposes Security Kafka Keys to Logs
> ---------------------------------------------------
>
>                 Key: SPARK-49499
>                 URL: https://issues.apache.org/jira/browse/SPARK-49499
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.5.1
>            Reporter: Rachel Bushrian
>            Priority: Major
>
>  
> I have a Spark Streaming job that reads data from a Delta table, performs 
> transformations, and writes the data to Kafka in Avro format. However, in the 
> event of a failure, Spark logs the logical plan, which inadvertently includes 
> all Kafka properties, including sensitive mTLS keys.
> The code:
> {code:java}
> val df = spark.readStream.format("delta")
>       .option("ignoreCorruptFiles", "true")
>       .option("path", fullPath)
>       .schema(schema.get)
>       .load()
> val transformedDF = df.transform(...)  
>  transformedDF.writeStream
>       .format("kafka")
>       .option("checkpointLocation", cpPath)
>       .options(config.conf)
>       .trigger(trigger.getOrElse(Trigger.ProcessingTime(0L)))
>       .start() 
>  df.sparkSession.streams.awaitAnyTermination(){code}
> The exception:
> {code:java}
> Exception in thread "main" 
> org.apache.spark.sql.streaming.StreamingQueryException: 
> org.apache.hadoop.fs.s3a.RemoteFileChangedException: open 
> s3a://.../_delta_log/00000000000001258632.json at 0 on 
> s3a://.../_delta_log/00000000000001258632.json: 
> software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, 
> Status Code: 412, Request ID: 
> tx0000012a53fe47376ba53-0066cf6372-149bb8-default):PreconditionFailed 
> `s3a://.../_delta_log/00000000000001258632.json': : null (Service: S3, Status 
> Code: 412, Request ID: tx0000012a53fe47376ba53-0066cf6372-149bb8-default) === 
> Streaming Query === Identifier: [id = 0d9cd948-f417-4688-8c02-355a772299a4, 
> runId = f420b516-3e25-4443-8ae3-9f511b37ff8c] Current Committed Offsets: 
> {DeltaSource[s3a://...]: 
> {"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}
>  Current Available Offsets: {DeltaSource[s3a://...]: 
> {"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}Current
>  State: ACTIVE Thread State: RUNNABLE{code}
> {code:java}
> Logical Plan: WriteToMicroBatchDataSource 
> org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7945310f, 
> 0d9cd948-f417-4688-8c02-355a772299a4, [kafka.ssl.keystore.type=PEM, 
> subscribe=dns-correlated-events, kafka.ssl.truststore.certificates=-----BEGIN 
> CERTIFICATE----- XXXX -----END CERTIFICATE----- , 
> kafka.max.request.size=2000000, 
> kafka.bootstrap.servers=kafkacluster-kafka-bootstrap.kafka.svc.cluster.local:9094,
>  kafka.ssl.keystore.key=-----BEGIN PRIVATE KEY----- XXXX -----END PRIVATE 
> KEY----- , checkpointLocation=s3a://…, topic=dns-correlated-events, 
> kafka.ssl.truststore.type=PEM, 
> kafka.ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE----- XXXX 
> -----END CERTIFICATE----- , kafka.security.protocol=SSL], Append +- Project 
> [to_aero(struct(ts, ts#744L, ...
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to