[spark] branch branch-3.0 updated: [SPARK-33163][SQL][TESTS] Check the metadata key 'org.apache.spark.legacyDateTime' in Avro/Parquet files

2020-10-15 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new d0f1120  [SPARK-33163][SQL][TESTS] Check the metadata key 
'org.apache.spark.legacyDateTime' in Avro/Parquet files
d0f1120 is described below

commit d0f1120f3fb524a52df71e03c3d28ac82f76c1a3
Author: Max Gekk 
AuthorDate: Fri Oct 16 10:28:15 2020 +0900

[SPARK-33163][SQL][TESTS] Check the metadata key 
'org.apache.spark.legacyDateTime' in Avro/Parquet files

### What changes were proposed in this pull request?
Added a couple tests to `AvroSuite` and to `ParquetIOSuite` to check that 
the metadata key 'org.apache.spark.legacyDateTime' is written correctly 
depending on the SQL configs:
- spark.sql.legacy.avro.datetimeRebaseModeInWrite
- spark.sql.legacy.parquet.datetimeRebaseModeInWrite

This is a follow up https://github.com/apache/spark/pull/28137.

### Why are the changes needed?
1. To improve test coverage
2. To make sure that the metadata key is actually saved to Avro/Parquet 
files

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the added tests:
```
$ build/sbt "testOnly 
org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV1Suite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV2Suite"
```

Closes #30061 from MaxGekk/parquet-test-metakey.

Authored-by: Max Gekk 
Signed-off-by: HyukjinKwon 
(cherry picked from commit 38c05af1d5538fc6ad00cdb57c1a90e90d04e25d)
Signed-off-by: HyukjinKwon 
---
 .../org/apache/spark/sql/avro/AvroSuite.scala  | 40 ++---
 .../datasources/parquet/ParquetIOSuite.scala   | 51 +-
 2 files changed, 73 insertions(+), 18 deletions(-)

diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index d2f49ae..5d7d2e4 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -1788,15 +1788,19 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 
+  private def checkMetaData(path: java.io.File, key: String, expectedValue: 
String): Unit = {
+val avroFiles = path.listFiles()
+  .filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
+assert(avroFiles.length === 1)
+val reader = DataFileReader.openReader(avroFiles(0), new 
GenericDatumReader[GenericRecord]())
+val value = reader.asInstanceOf[DataFileReader[_]].getMetaString(key)
+assert(value === expectedValue)
+  }
+
   test("SPARK-31327: Write Spark version into Avro file metadata") {
 withTempPath { path =>
   
spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath)
-  val avroFiles = path.listFiles()
-.filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
-  assert(avroFiles.length === 1)
-  val reader = DataFileReader.openReader(avroFiles(0), new 
GenericDatumReader[GenericRecord]())
-  val version = 
reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY)
-  assert(version === SPARK_VERSION_SHORT)
+  checkMetaData(path, SPARK_VERSION_METADATA_KEY, SPARK_VERSION_SHORT)
 }
   }
 
@@ -1809,6 +1813,30 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
   spark.read.format("avro").options(conf).load(path)
 }
   }
+
+  test("SPARK-33163: write the metadata key 
'org.apache.spark.legacyDateTime'") {
+def saveTs(dir: java.io.File): Unit = {
+  Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF()
+.repartition(1)
+.write
+.format("avro")
+.save(dir.getAbsolutePath)
+}
+withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
+  withTempPath { dir =>
+saveTs(dir)
+checkMetaData(dir, SPARK_LEGACY_DATETIME, "")
+  }
+}
+Seq(CORRECTED, EXCEPTION).foreach { mode =>
+  withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
mode.toString) {
+withTempPath { dir =>
+  saveTs(dir)
+  checkMetaData(dir, SPARK_LEGACY_DATETIME, null)
+}
+  }
+}
+  }
 }
 
 class AvroV1Suite extends AvroSuite {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 2dc8a06..ff406f7 100644
--- 

[spark] branch branch-3.0 updated: [SPARK-33163][SQL][TESTS] Check the metadata key 'org.apache.spark.legacyDateTime' in Avro/Parquet files

2020-10-15 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new d0f1120  [SPARK-33163][SQL][TESTS] Check the metadata key 
'org.apache.spark.legacyDateTime' in Avro/Parquet files
d0f1120 is described below

commit d0f1120f3fb524a52df71e03c3d28ac82f76c1a3
Author: Max Gekk 
AuthorDate: Fri Oct 16 10:28:15 2020 +0900

[SPARK-33163][SQL][TESTS] Check the metadata key 
'org.apache.spark.legacyDateTime' in Avro/Parquet files

### What changes were proposed in this pull request?
Added a couple tests to `AvroSuite` and to `ParquetIOSuite` to check that 
the metadata key 'org.apache.spark.legacyDateTime' is written correctly 
depending on the SQL configs:
- spark.sql.legacy.avro.datetimeRebaseModeInWrite
- spark.sql.legacy.parquet.datetimeRebaseModeInWrite

This is a follow up https://github.com/apache/spark/pull/28137.

### Why are the changes needed?
1. To improve test coverage
2. To make sure that the metadata key is actually saved to Avro/Parquet 
files

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the added tests:
```
$ build/sbt "testOnly 
org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV1Suite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV2Suite"
```

Closes #30061 from MaxGekk/parquet-test-metakey.

Authored-by: Max Gekk 
Signed-off-by: HyukjinKwon 
(cherry picked from commit 38c05af1d5538fc6ad00cdb57c1a90e90d04e25d)
Signed-off-by: HyukjinKwon 
---
 .../org/apache/spark/sql/avro/AvroSuite.scala  | 40 ++---
 .../datasources/parquet/ParquetIOSuite.scala   | 51 +-
 2 files changed, 73 insertions(+), 18 deletions(-)

diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index d2f49ae..5d7d2e4 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -1788,15 +1788,19 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 
+  private def checkMetaData(path: java.io.File, key: String, expectedValue: 
String): Unit = {
+val avroFiles = path.listFiles()
+  .filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
+assert(avroFiles.length === 1)
+val reader = DataFileReader.openReader(avroFiles(0), new 
GenericDatumReader[GenericRecord]())
+val value = reader.asInstanceOf[DataFileReader[_]].getMetaString(key)
+assert(value === expectedValue)
+  }
+
   test("SPARK-31327: Write Spark version into Avro file metadata") {
 withTempPath { path =>
   
spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath)
-  val avroFiles = path.listFiles()
-.filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
-  assert(avroFiles.length === 1)
-  val reader = DataFileReader.openReader(avroFiles(0), new 
GenericDatumReader[GenericRecord]())
-  val version = 
reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY)
-  assert(version === SPARK_VERSION_SHORT)
+  checkMetaData(path, SPARK_VERSION_METADATA_KEY, SPARK_VERSION_SHORT)
 }
   }
 
@@ -1809,6 +1813,30 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
   spark.read.format("avro").options(conf).load(path)
 }
   }
+
+  test("SPARK-33163: write the metadata key 
'org.apache.spark.legacyDateTime'") {
+def saveTs(dir: java.io.File): Unit = {
+  Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF()
+.repartition(1)
+.write
+.format("avro")
+.save(dir.getAbsolutePath)
+}
+withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
+  withTempPath { dir =>
+saveTs(dir)
+checkMetaData(dir, SPARK_LEGACY_DATETIME, "")
+  }
+}
+Seq(CORRECTED, EXCEPTION).foreach { mode =>
+  withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
mode.toString) {
+withTempPath { dir =>
+  saveTs(dir)
+  checkMetaData(dir, SPARK_LEGACY_DATETIME, null)
+}
+  }
+}
+  }
 }
 
 class AvroV1Suite extends AvroSuite {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 2dc8a06..ff406f7 100644
--- 

[spark] branch branch-3.0 updated: [SPARK-33163][SQL][TESTS] Check the metadata key 'org.apache.spark.legacyDateTime' in Avro/Parquet files

2020-10-15 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new d0f1120  [SPARK-33163][SQL][TESTS] Check the metadata key 
'org.apache.spark.legacyDateTime' in Avro/Parquet files
d0f1120 is described below

commit d0f1120f3fb524a52df71e03c3d28ac82f76c1a3
Author: Max Gekk 
AuthorDate: Fri Oct 16 10:28:15 2020 +0900

[SPARK-33163][SQL][TESTS] Check the metadata key 
'org.apache.spark.legacyDateTime' in Avro/Parquet files

### What changes were proposed in this pull request?
Added a couple tests to `AvroSuite` and to `ParquetIOSuite` to check that 
the metadata key 'org.apache.spark.legacyDateTime' is written correctly 
depending on the SQL configs:
- spark.sql.legacy.avro.datetimeRebaseModeInWrite
- spark.sql.legacy.parquet.datetimeRebaseModeInWrite

This is a follow up https://github.com/apache/spark/pull/28137.

### Why are the changes needed?
1. To improve test coverage
2. To make sure that the metadata key is actually saved to Avro/Parquet 
files

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the added tests:
```
$ build/sbt "testOnly 
org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV1Suite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV2Suite"
```

Closes #30061 from MaxGekk/parquet-test-metakey.

Authored-by: Max Gekk 
Signed-off-by: HyukjinKwon 
(cherry picked from commit 38c05af1d5538fc6ad00cdb57c1a90e90d04e25d)
Signed-off-by: HyukjinKwon 
---
 .../org/apache/spark/sql/avro/AvroSuite.scala  | 40 ++---
 .../datasources/parquet/ParquetIOSuite.scala   | 51 +-
 2 files changed, 73 insertions(+), 18 deletions(-)

diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index d2f49ae..5d7d2e4 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -1788,15 +1788,19 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 
+  private def checkMetaData(path: java.io.File, key: String, expectedValue: 
String): Unit = {
+val avroFiles = path.listFiles()
+  .filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
+assert(avroFiles.length === 1)
+val reader = DataFileReader.openReader(avroFiles(0), new 
GenericDatumReader[GenericRecord]())
+val value = reader.asInstanceOf[DataFileReader[_]].getMetaString(key)
+assert(value === expectedValue)
+  }
+
   test("SPARK-31327: Write Spark version into Avro file metadata") {
 withTempPath { path =>
   
spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath)
-  val avroFiles = path.listFiles()
-.filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
-  assert(avroFiles.length === 1)
-  val reader = DataFileReader.openReader(avroFiles(0), new 
GenericDatumReader[GenericRecord]())
-  val version = 
reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY)
-  assert(version === SPARK_VERSION_SHORT)
+  checkMetaData(path, SPARK_VERSION_METADATA_KEY, SPARK_VERSION_SHORT)
 }
   }
 
@@ -1809,6 +1813,30 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
   spark.read.format("avro").options(conf).load(path)
 }
   }
+
+  test("SPARK-33163: write the metadata key 
'org.apache.spark.legacyDateTime'") {
+def saveTs(dir: java.io.File): Unit = {
+  Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF()
+.repartition(1)
+.write
+.format("avro")
+.save(dir.getAbsolutePath)
+}
+withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
+  withTempPath { dir =>
+saveTs(dir)
+checkMetaData(dir, SPARK_LEGACY_DATETIME, "")
+  }
+}
+Seq(CORRECTED, EXCEPTION).foreach { mode =>
+  withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
mode.toString) {
+withTempPath { dir =>
+  saveTs(dir)
+  checkMetaData(dir, SPARK_LEGACY_DATETIME, null)
+}
+  }
+}
+  }
 }
 
 class AvroV1Suite extends AvroSuite {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 2dc8a06..ff406f7 100644
--- 

[spark] branch branch-3.0 updated: [SPARK-33163][SQL][TESTS] Check the metadata key 'org.apache.spark.legacyDateTime' in Avro/Parquet files

2020-10-15 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new d0f1120  [SPARK-33163][SQL][TESTS] Check the metadata key 
'org.apache.spark.legacyDateTime' in Avro/Parquet files
d0f1120 is described below

commit d0f1120f3fb524a52df71e03c3d28ac82f76c1a3
Author: Max Gekk 
AuthorDate: Fri Oct 16 10:28:15 2020 +0900

[SPARK-33163][SQL][TESTS] Check the metadata key 
'org.apache.spark.legacyDateTime' in Avro/Parquet files

### What changes were proposed in this pull request?
Added a couple tests to `AvroSuite` and to `ParquetIOSuite` to check that 
the metadata key 'org.apache.spark.legacyDateTime' is written correctly 
depending on the SQL configs:
- spark.sql.legacy.avro.datetimeRebaseModeInWrite
- spark.sql.legacy.parquet.datetimeRebaseModeInWrite

This is a follow up https://github.com/apache/spark/pull/28137.

### Why are the changes needed?
1. To improve test coverage
2. To make sure that the metadata key is actually saved to Avro/Parquet 
files

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the added tests:
```
$ build/sbt "testOnly 
org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV1Suite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV2Suite"
```

Closes #30061 from MaxGekk/parquet-test-metakey.

Authored-by: Max Gekk 
Signed-off-by: HyukjinKwon 
(cherry picked from commit 38c05af1d5538fc6ad00cdb57c1a90e90d04e25d)
Signed-off-by: HyukjinKwon 
---
 .../org/apache/spark/sql/avro/AvroSuite.scala  | 40 ++---
 .../datasources/parquet/ParquetIOSuite.scala   | 51 +-
 2 files changed, 73 insertions(+), 18 deletions(-)

diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index d2f49ae..5d7d2e4 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -1788,15 +1788,19 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 
+  private def checkMetaData(path: java.io.File, key: String, expectedValue: 
String): Unit = {
+val avroFiles = path.listFiles()
+  .filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
+assert(avroFiles.length === 1)
+val reader = DataFileReader.openReader(avroFiles(0), new 
GenericDatumReader[GenericRecord]())
+val value = reader.asInstanceOf[DataFileReader[_]].getMetaString(key)
+assert(value === expectedValue)
+  }
+
   test("SPARK-31327: Write Spark version into Avro file metadata") {
 withTempPath { path =>
   
spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath)
-  val avroFiles = path.listFiles()
-.filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
-  assert(avroFiles.length === 1)
-  val reader = DataFileReader.openReader(avroFiles(0), new 
GenericDatumReader[GenericRecord]())
-  val version = 
reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY)
-  assert(version === SPARK_VERSION_SHORT)
+  checkMetaData(path, SPARK_VERSION_METADATA_KEY, SPARK_VERSION_SHORT)
 }
   }
 
@@ -1809,6 +1813,30 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
   spark.read.format("avro").options(conf).load(path)
 }
   }
+
+  test("SPARK-33163: write the metadata key 
'org.apache.spark.legacyDateTime'") {
+def saveTs(dir: java.io.File): Unit = {
+  Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF()
+.repartition(1)
+.write
+.format("avro")
+.save(dir.getAbsolutePath)
+}
+withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
+  withTempPath { dir =>
+saveTs(dir)
+checkMetaData(dir, SPARK_LEGACY_DATETIME, "")
+  }
+}
+Seq(CORRECTED, EXCEPTION).foreach { mode =>
+  withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
mode.toString) {
+withTempPath { dir =>
+  saveTs(dir)
+  checkMetaData(dir, SPARK_LEGACY_DATETIME, null)
+}
+  }
+}
+  }
 }
 
 class AvroV1Suite extends AvroSuite {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 2dc8a06..ff406f7 100644
--- 

[spark] branch branch-3.0 updated: [SPARK-33163][SQL][TESTS] Check the metadata key 'org.apache.spark.legacyDateTime' in Avro/Parquet files

2020-10-15 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
 new d0f1120  [SPARK-33163][SQL][TESTS] Check the metadata key 
'org.apache.spark.legacyDateTime' in Avro/Parquet files
d0f1120 is described below

commit d0f1120f3fb524a52df71e03c3d28ac82f76c1a3
Author: Max Gekk 
AuthorDate: Fri Oct 16 10:28:15 2020 +0900

[SPARK-33163][SQL][TESTS] Check the metadata key 
'org.apache.spark.legacyDateTime' in Avro/Parquet files

### What changes were proposed in this pull request?
Added a couple tests to `AvroSuite` and to `ParquetIOSuite` to check that 
the metadata key 'org.apache.spark.legacyDateTime' is written correctly 
depending on the SQL configs:
- spark.sql.legacy.avro.datetimeRebaseModeInWrite
- spark.sql.legacy.parquet.datetimeRebaseModeInWrite

This is a follow up https://github.com/apache/spark/pull/28137.

### Why are the changes needed?
1. To improve test coverage
2. To make sure that the metadata key is actually saved to Avro/Parquet 
files

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running the added tests:
```
$ build/sbt "testOnly 
org.apache.spark.sql.execution.datasources.parquet.ParquetIOSuite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV1Suite"
$ build/sbt "avro/test:testOnly org.apache.spark.sql.avro.AvroV2Suite"
```

Closes #30061 from MaxGekk/parquet-test-metakey.

Authored-by: Max Gekk 
Signed-off-by: HyukjinKwon 
(cherry picked from commit 38c05af1d5538fc6ad00cdb57c1a90e90d04e25d)
Signed-off-by: HyukjinKwon 
---
 .../org/apache/spark/sql/avro/AvroSuite.scala  | 40 ++---
 .../datasources/parquet/ParquetIOSuite.scala   | 51 +-
 2 files changed, 73 insertions(+), 18 deletions(-)

diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
index d2f49ae..5d7d2e4 100644
--- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
+++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroSuite.scala
@@ -1788,15 +1788,19 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
 }
   }
 
+  private def checkMetaData(path: java.io.File, key: String, expectedValue: 
String): Unit = {
+val avroFiles = path.listFiles()
+  .filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
+assert(avroFiles.length === 1)
+val reader = DataFileReader.openReader(avroFiles(0), new 
GenericDatumReader[GenericRecord]())
+val value = reader.asInstanceOf[DataFileReader[_]].getMetaString(key)
+assert(value === expectedValue)
+  }
+
   test("SPARK-31327: Write Spark version into Avro file metadata") {
 withTempPath { path =>
   
spark.range(1).repartition(1).write.format("avro").save(path.getCanonicalPath)
-  val avroFiles = path.listFiles()
-.filter(f => f.isFile && !f.getName.startsWith(".") && 
!f.getName.startsWith("_"))
-  assert(avroFiles.length === 1)
-  val reader = DataFileReader.openReader(avroFiles(0), new 
GenericDatumReader[GenericRecord]())
-  val version = 
reader.asInstanceOf[DataFileReader[_]].getMetaString(SPARK_VERSION_METADATA_KEY)
-  assert(version === SPARK_VERSION_SHORT)
+  checkMetaData(path, SPARK_VERSION_METADATA_KEY, SPARK_VERSION_SHORT)
 }
   }
 
@@ -1809,6 +1813,30 @@ abstract class AvroSuite extends QueryTest with 
SharedSparkSession {
   spark.read.format("avro").options(conf).load(path)
 }
   }
+
+  test("SPARK-33163: write the metadata key 
'org.apache.spark.legacyDateTime'") {
+def saveTs(dir: java.io.File): Unit = {
+  Seq(Timestamp.valueOf("2020-10-15 01:02:03")).toDF()
+.repartition(1)
+.write
+.format("avro")
+.save(dir.getAbsolutePath)
+}
+withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
LEGACY.toString) {
+  withTempPath { dir =>
+saveTs(dir)
+checkMetaData(dir, SPARK_LEGACY_DATETIME, "")
+  }
+}
+Seq(CORRECTED, EXCEPTION).foreach { mode =>
+  withSQLConf(SQLConf.LEGACY_AVRO_REBASE_MODE_IN_WRITE.key -> 
mode.toString) {
+withTempPath { dir =>
+  saveTs(dir)
+  checkMetaData(dir, SPARK_LEGACY_DATETIME, null)
+}
+  }
+}
+  }
 }
 
 class AvroV1Suite extends AvroSuite {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 2dc8a06..ff406f7 100644
---