[ https://issues.apache.org/jira/browse/SPARK-23457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dongjoon Hyun updated SPARK-23457: ---------------------------------- Description: ParquetFileFormat leaks open files in some cases. This issue aims to register task completion listener first. {code} test("SPARK-23390 Register task completion listeners first in ParquetFileFormat") { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") { withTempDir { dir => val basePath = dir.getCanonicalPath Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, "first").toString) Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, "second").toString) val df = spark.read.parquet( new Path(basePath, "first").toString, new Path(basePath, "second").toString) val e = intercept[SparkException] { df.collect() } assert(e.getCause.isInstanceOf[OutOfMemoryError]) } } } {code} was: ParquetFileFormat leaks open files in some cases. {code} test("SPARK-23390 Register task completion listeners first in ParquetFileFormat") { withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> s"${Int.MaxValue}") { withTempDir { dir => val basePath = dir.getCanonicalPath Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, "first").toString) Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, "second").toString) val df = spark.read.parquet( new Path(basePath, "first").toString, new Path(basePath, "second").toString) val e = intercept[SparkException] { df.collect() } assert(e.getCause.isInstanceOf[OutOfMemoryError]) } } } {code} > Register task completion listeners first for Parquet > ---------------------------------------------------- > > Key: SPARK-23457 > URL: https://issues.apache.org/jira/browse/SPARK-23457 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.0 > Reporter: Dongjoon Hyun > Priority: Major > > ParquetFileFormat leaks open files in some cases. This issue aims to register > task completion listener first. > {code} > test("SPARK-23390 Register task completion listeners first in > ParquetFileFormat") { > withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE.key -> > s"${Int.MaxValue}") { > withTempDir { dir => > val basePath = dir.getCanonicalPath > Seq(0).toDF("a").write.format("parquet").save(new Path(basePath, > "first").toString) > Seq(1).toDF("a").write.format("parquet").save(new Path(basePath, > "second").toString) > val df = spark.read.parquet( > new Path(basePath, "first").toString, > new Path(basePath, "second").toString) > val e = intercept[SparkException] { > df.collect() > } > assert(e.getCause.isInstanceOf[OutOfMemoryError]) > } > } > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org