This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f7c85b8 [SPARK-36456][CORE][SQL][SS] Clean up compilation warnings related to `method closeQuietly in class IOUtils is deprecated` f7c85b8 is described below commit f7c85b855ba99757c750dd0a2f7aced788c89374 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Tue Aug 10 23:17:57 2021 -0700 [SPARK-36456][CORE][SQL][SS] Clean up compilation warnings related to `method closeQuietly in class IOUtils is deprecated` ### What changes were proposed in this pull request? There are some compilation warnings related to `method closeQuietly in class IOUtils is deprecated`, `Apache commons-io` suggests that we should use the `try-with-resources` statement or handle suppressed exceptions manually. The main change of this pr is replace `o.a.commons.io.IOUtils.closeQuietly` with `o.a.s.network.util.JavaUtils.closeQuietly` directly because all original logic is suppressing `IOException`. ### Why are the changes needed? Clean up compilation warnings related to `method closeQuietly in class IOUtils is deprecated` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass the Jenkins or GitHub Action Closes #33682 from LuciferYang/closeQuietly. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- core/src/main/scala/org/apache/spark/storage/BlockManager.scala | 5 ++--- .../org/apache/spark/storage/ShuffleBlockFetcherIterator.scala | 5 ++--- core/src/main/scala/org/apache/spark/util/Utils.scala | 4 ++-- .../scala/org/apache/spark/util/logging/RollingFileAppender.scala | 5 +++-- core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala | 3 ++- core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 6 +++--- .../org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 4 ++-- .../org/apache/spark/sql/execution/streaming/StreamMetadata.scala | 4 ++-- .../execution/streaming/state/HDFSBackedStateStoreProvider.scala | 4 ++-- .../spark/sql/execution/streaming/state/RocksDBFileManager.scala | 5 +++-- 10 files changed, 23 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 4c052db..92b3608 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -35,7 +35,6 @@ import scala.util.control.NonFatal import com.codahale.metrics.{MetricRegistry, MetricSet} import com.github.benmanes.caffeine.cache.Caffeine -import org.apache.commons.io.IOUtils import org.apache.spark._ import org.apache.spark.errors.SparkCoreErrors @@ -52,7 +51,7 @@ import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper} import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo -import org.apache.spark.network.util.TransportConf +import org.apache.spark.network.util.{JavaUtils, TransportConf} import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.ExecutorCacheTaskLocation import org.apache.spark.serializer.{SerializerInstance, SerializerManager} @@ -342,7 +341,7 @@ private[spark] class BlockManager( false } } finally { - IOUtils.closeQuietly(inputStream) + JavaUtils.closeQuietly(inputStream) } } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index eaecf65..b1713ec 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -29,7 +29,6 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue} import scala.util.{Failure, Success} import io.netty.util.internal.OutOfDirectMemoryError -import org.apache.commons.io.IOUtils import org.roaringbitmap.RoaringBitmap import org.apache.spark.{MapOutputTracker, TaskContext} @@ -39,7 +38,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer} import org.apache.spark.network.shuffle._ import org.apache.spark.network.shuffle.checksum.{Cause, ShuffleChecksumHelper} -import org.apache.spark.network.util.{NettyUtils, TransportConf} +import org.apache.spark.network.util.{JavaUtils, NettyUtils, TransportConf} import org.apache.spark.shuffle.ShuffleReadMetricsReporter import org.apache.spark.util.{CompletionIterator, TaskCompletionListener, Utils} @@ -1304,7 +1303,7 @@ private class BufferReleasingInputStream( val diagnosisResponse = checkedInOpt.map { checkedIn => iterator.diagnoseCorruption(checkedIn, address, blockId) } - IOUtils.closeQuietly(this) + JavaUtils.closeQuietly(this) // We'd never retry the block whatever the cause is since the block has been // partially consumed by downstream RDDs. iterator.throwFetchFailedException(blockId, mapIndex, address, e, diagnosisResponse) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index f3268cb..e1c26a4 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -3139,8 +3139,8 @@ private[spark] object Utils extends Logging { logInfo(s"Unzipped from $dfsZipFile\n\t${files.mkString("\n\t")}") } finally { // Close everything no matter what happened - IOUtils.closeQuietly(in) - IOUtils.closeQuietly(out) + JavaUtils.closeQuietly(in) + JavaUtils.closeQuietly(out) } files.toSeq } diff --git a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala index 68a5923..10363a9 100644 --- a/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala +++ b/core/src/main/scala/org/apache/spark/util/logging/RollingFileAppender.scala @@ -25,6 +25,7 @@ import org.apache.commons.io.IOUtils import org.apache.spark.SparkConf import org.apache.spark.internal.config +import org.apache.spark.network.util.JavaUtils /** * Continuously appends data from input stream into the given file, and rolls @@ -94,8 +95,8 @@ private[spark] class RollingFileAppender( gzOutputStream.close() activeFile.delete() } finally { - IOUtils.closeQuietly(inputStream) - IOUtils.closeQuietly(gzOutputStream) + JavaUtils.closeQuietly(inputStream) + JavaUtils.closeQuietly(gzOutputStream) } } else { Files.move(activeFile, rolloverFile) diff --git a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala index 71010a1..1197bea 100644 --- a/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/FileAppenderSuite.scala @@ -35,6 +35,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.{config, Logging} +import org.apache.spark.network.util.JavaUtils import org.apache.spark.util.logging.{FileAppender, RollingFileAppender, SizeBasedRollingPolicy, TimeBasedRollingPolicy} class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { @@ -380,7 +381,7 @@ class FileAppenderSuite extends SparkFunSuite with BeforeAndAfter with Logging { try { IOUtils.toString(inputStream, StandardCharsets.UTF_8) } finally { - IOUtils.closeQuietly(inputStream) + JavaUtils.closeQuietly(inputStream) } } else { Files.toString(file, StandardCharsets.UTF_8) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 677efec..c1b7b5f 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Tests.IS_TESTING import org.apache.spark.launcher.SparkLauncher -import org.apache.spark.network.util.ByteUnit +import org.apache.spark.network.util.{ByteUnit, JavaUtils} import org.apache.spark.scheduler.SparkListener import org.apache.spark.util.io.ChunkedByteBufferInputStream @@ -245,8 +245,8 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(mergedStream.read() === -1) assert(byteBufferInputStream.chunkedByteBuffer === null) } finally { - IOUtils.closeQuietly(mergedStream) - IOUtils.closeQuietly(in) + JavaUtils.closeQuietly(mergedStream) + JavaUtils.closeQuietly(in) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala index 8a037b5..2a4e064 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala @@ -22,12 +22,12 @@ import java.nio.charset.StandardCharsets import scala.reflect.ClassTag -import org.apache.commons.io.IOUtils import org.apache.hadoop.fs._ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging +import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.SparkSession import org.apache.spark.sql.errors.QueryExecutionErrors @@ -147,7 +147,7 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path: throw new IllegalStateException( s"Failed to read log file $batchMetadataFile. ${ise.getMessage}", ise) } finally { - IOUtils.closeQuietly(input) + JavaUtils.closeQuietly(input) } } else { throw QueryExecutionErrors.batchMetadataFileNotFoundError(batchMetadataFile) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala index cb18988..b46be4f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetadata.scala @@ -22,13 +22,13 @@ import java.nio.charset.StandardCharsets import scala.util.control.NonFatal -import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileAlreadyExistsException, FSDataInputStream, Path} import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging +import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream @@ -63,7 +63,7 @@ object StreamMetadata extends Logging { logError(s"Error reading stream metadata from $metadataFile", e) throw e } finally { - IOUtils.closeQuietly(input) + JavaUtils.closeQuietly(input) } } else None } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 75b7dae..ce2bbe8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -27,13 +27,13 @@ import scala.collection.mutable import scala.util.control.NonFatal import com.google.common.io.ByteStreams -import org.apache.commons.io.IOUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs._ import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.io.CompressionCodec +import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.streaming.CheckpointFileManager @@ -542,7 +542,7 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with rawStream: CancellableFSDataOutputStream): Unit = { try { if (rawStream != null) rawStream.cancel() - IOUtils.closeQuietly(compressedStream) + JavaUtils.closeQuietly(compressedStream) } catch { case e: FSError if e.getCause.isInstanceOf[IOException] => // Closing the compressedStream causes the stream to write/flush flush data into the diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index 23cdbd0..3378064 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -38,6 +38,7 @@ import org.json4s.NoTypeHints import org.json4s.jackson.Serialization import org.apache.spark.internal.Logging +import org.apache.spark.network.util.JavaUtils import org.apache.spark.sql.execution.streaming.CheckpointFileManager import org.apache.spark.util.Utils @@ -458,8 +459,8 @@ class RocksDBFileManager( throw e } finally { // Close everything no matter what happened - IOUtils.closeQuietly(in) - IOUtils.closeQuietly(zout) + JavaUtils.closeQuietly(in) + JavaUtils.closeQuietly(zout) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org