[ https://issues.apache.org/jira/browse/SPARK-49499?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rachel Bushrian updated SPARK-49499: ------------------------------------ Description: I have a Spark Streaming job that reads data from a Delta table, performs transformations, and writes the data to Kafka in Avro format. However, in the event of a failure, Spark logs the logical plan, which inadvertently includes all Kafka properties, including sensitive mTLS keys. The code: {code:java} val df = spark.readStream.format("delta") .option("ignoreCorruptFiles", "true") .option("path", fullPath) .schema(schema.get) .load() val transformedDF = df.transform(...) transformedDF.writeStream .format("kafka") .option("checkpointLocation", cpPath) .options(config.conf) .trigger(trigger.getOrElse(Trigger.ProcessingTime(0L))) .start() df.sparkSession.streams.awaitAnyTermination(){code} The exception: {code:java} Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: org.apache.hadoop.fs.s3a.RemoteFileChangedException: open s3a://.../_delta_log/00000000000001258632.json at 0 on s3a://.../_delta_log/00000000000001258632.json: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 412, Request ID: tx0000012a53fe47376ba53-0066cf6372-149bb8-default):PreconditionFailed `s3a://.../_delta_log/00000000000001258632.json': : null (Service: S3, Status Code: 412, Request ID: tx0000012a53fe47376ba53-0066cf6372-149bb8-default) === Streaming Query === Identifier: [id = 0d9cd948-f417-4688-8c02-355a772299a4, runId = f420b516-3e25-4443-8ae3-9f511b37ff8c] Current Committed Offsets: {DeltaSource[s3a://...]: {"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}} Current Available Offsets: {DeltaSource[s3a://...]: {"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}Current State: ACTIVE Thread State: RUNNABLE{code} {code:java} Logical Plan: WriteToMicroBatchDataSource org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7945310f, 0d9cd948-f417-4688-8c02-355a772299a4, [kafka.ssl.keystore.type=PEM, subscribe=dns-correlated-events, kafka.ssl.truststore.certificates=-----BEGIN CERTIFICATE----- XXXX -----END CERTIFICATE----- , kafka.max.request.size=2000000, kafka.bootstrap.servers=kafkacluster-kafka-bootstrap.kafka.svc.cluster.local:9094, kafka.ssl.keystore.key=-----BEGIN PRIVATE KEY----- XXXX -----END PRIVATE KEY----- , checkpointLocation=s3a://…, topic=dns-correlated-events, kafka.ssl.truststore.type=PEM, kafka.ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE----- XXXX -----END CERTIFICATE----- , kafka.security.protocol=SSL], Append +- Project [to_aero(struct(ts, ts#744L, ... {code} was: I have a Spark Streaming job that reads data from a Delta table, performs transformations, and writes the data to Kafka in Avro format. However, in the event of a failure, Spark logs the logical plan, which inadvertently includes all Kafka properties, including sensitive mTLS keys. The code: {code:java} val df = spark.readStream.format("delta") .option("ignoreCorruptFiles", "true") .option("path", fullPath) .schema(schema.get) .load() val transformedDF = df.transform(...) transformedDF.writeStream .format("kafka") .option("checkpointLocation", cpPath) .options(config.conf) .trigger(trigger.getOrElse(Trigger.ProcessingTime(0L))) .start() df.sparkSession.streams.awaitAnyTermination(){code} The exception: {code:java} Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: org.apache.hadoop.fs.s3a.RemoteFileChangedException: open s3a://.../_delta_log/00000000000001258632.json at 0 on s3a://.../_delta_log/00000000000001258632.json: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 412, Request ID: tx0000012a53fe47376ba53-0066cf6372-149bb8-default):PreconditionFailed `s3a://.../_delta_log/00000000000001258632.json': : null (Service: S3, Status Code: 412, Request ID: tx0000012a53fe47376ba53-0066cf6372-149bb8-default) === Streaming Query === Identifier: [id = 0d9cd948-f417-4688-8c02-355a772299a4, runId = f420b516-3e25-4443-8ae3-9f511b37ff8c] Current Committed Offsets: {DeltaSource[s3a://...]: {"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}} Current Available Offsets: {DeltaSource[s3a://...]: {"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}Current State: ACTIVE Thread State: RUNNABLELogical Plan: WriteToMicroBatchDataSource org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7945310f, 0d9cd948-f417-4688-8c02-355a772299a4, [kafka.ssl.keystore.type=PEM, subscribe=dns-correlated-events, kafka.ssl.truststore.certificates=-----BEGIN CERTIFICATE----- XXXX -----END CERTIFICATE----- , kafka.max.request.size=2000000, kafka.bootstrap.servers=kafkacluster-kafka-bootstrap.kafka.svc.cluster.local:9094, kafka.ssl.keystore.key=-----BEGIN PRIVATE KEY----- XXXX -----END PRIVATE KEY----- , checkpointLocation=s3a://…, topic=dns-correlated-events, kafka.ssl.truststore.type=PEM, kafka.ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE----- XXXX -----END CERTIFICATE----- , kafka.security.protocol=SSL], Append +- Project [to_aero(struct(ts, ts#744L, ... {code} > Spark Streaming Exposes Security Kafka Keys to Logs > --------------------------------------------------- > > Key: SPARK-49499 > URL: https://issues.apache.org/jira/browse/SPARK-49499 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 3.5.1 > Reporter: Rachel Bushrian > Priority: Major > > > I have a Spark Streaming job that reads data from a Delta table, performs > transformations, and writes the data to Kafka in Avro format. However, in the > event of a failure, Spark logs the logical plan, which inadvertently includes > all Kafka properties, including sensitive mTLS keys. > The code: > {code:java} > val df = spark.readStream.format("delta") > .option("ignoreCorruptFiles", "true") > .option("path", fullPath) > .schema(schema.get) > .load() > val transformedDF = df.transform(...) > transformedDF.writeStream > .format("kafka") > .option("checkpointLocation", cpPath) > .options(config.conf) > .trigger(trigger.getOrElse(Trigger.ProcessingTime(0L))) > .start() > df.sparkSession.streams.awaitAnyTermination(){code} > The exception: > {code:java} > Exception in thread "main" > org.apache.spark.sql.streaming.StreamingQueryException: > org.apache.hadoop.fs.s3a.RemoteFileChangedException: open > s3a://.../_delta_log/00000000000001258632.json at 0 on > s3a://.../_delta_log/00000000000001258632.json: > software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, > Status Code: 412, Request ID: > tx0000012a53fe47376ba53-0066cf6372-149bb8-default):PreconditionFailed > `s3a://.../_delta_log/00000000000001258632.json': : null (Service: S3, Status > Code: 412, Request ID: tx0000012a53fe47376ba53-0066cf6372-149bb8-default) === > Streaming Query === Identifier: [id = 0d9cd948-f417-4688-8c02-355a772299a4, > runId = f420b516-3e25-4443-8ae3-9f511b37ff8c] Current Committed Offsets: > {DeltaSource[s3a://...]: > {"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}} > Current Available Offsets: {DeltaSource[s3a://...]: > {"sourceVersion":1,"reservoirId":"4c9d94b9-37b7-4aa7-bc9e-e914dfdc4ea2","reservoirVersion":1258632,"index":-1,"isStartingVersion":false}}Current > State: ACTIVE Thread State: RUNNABLE{code} > {code:java} > Logical Plan: WriteToMicroBatchDataSource > org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@7945310f, > 0d9cd948-f417-4688-8c02-355a772299a4, [kafka.ssl.keystore.type=PEM, > subscribe=dns-correlated-events, kafka.ssl.truststore.certificates=-----BEGIN > CERTIFICATE----- XXXX -----END CERTIFICATE----- , > kafka.max.request.size=2000000, > kafka.bootstrap.servers=kafkacluster-kafka-bootstrap.kafka.svc.cluster.local:9094, > kafka.ssl.keystore.key=-----BEGIN PRIVATE KEY----- XXXX -----END PRIVATE > KEY----- , checkpointLocation=s3a://…, topic=dns-correlated-events, > kafka.ssl.truststore.type=PEM, > kafka.ssl.keystore.certificate.chain=-----BEGIN CERTIFICATE----- XXXX > -----END CERTIFICATE----- , kafka.security.protocol=SSL], Append +- Project > [to_aero(struct(ts, ts#744L, ... > {code} > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org