This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 093fbf1aa852 [SPARK-45831][CORE][SQL][DSTREAM] Use collection factory 
instead to create immutable Java collections
093fbf1aa852 is described below

commit 093fbf1aa8520193b8d929f9f855afe0aded20a1
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Wed Nov 8 19:23:29 2023 -0800

    [SPARK-45831][CORE][SQL][DSTREAM] Use collection factory instead to create 
immutable Java collections
    
    ### What changes were proposed in this pull request?
    This pr change to use collection factory instread of 
`Collections.unmodifiable` to create an immutable Java collection(new 
collection API introduced after [JEP 269](https://openjdk.org/jeps/269))
    
    ### Why are the changes needed?
    Make the relevant code look simple and clear.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43709 from LuciferYang/collection-factory.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/network/util/JavaUtils.java   | 44 +++++++++++-----------
 .../main/scala/org/apache/spark/FutureAction.scala |  5 +--
 .../org/apache/spark/util/AccumulatorV2.scala      |  3 +-
 .../parquet/SpecificParquetRecordReaderBase.java   |  5 +--
 .../org/apache/spark/streaming/JavaAPISuite.java   |  4 +-
 5 files changed, 26 insertions(+), 35 deletions(-)

diff --git 
a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java 
b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
index bbe764b8366c..fa0a2629f350 100644
--- a/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
+++ b/common/utils/src/main/java/org/apache/spark/network/util/JavaUtils.java
@@ -202,29 +202,27 @@ public class JavaUtils {
   private static final Map<String, ByteUnit> byteSuffixes;
 
   static {
-    final Map<String, TimeUnit> timeSuffixesBuilder = new HashMap<>();
-    timeSuffixesBuilder.put("us", TimeUnit.MICROSECONDS);
-    timeSuffixesBuilder.put("ms", TimeUnit.MILLISECONDS);
-    timeSuffixesBuilder.put("s", TimeUnit.SECONDS);
-    timeSuffixesBuilder.put("m", TimeUnit.MINUTES);
-    timeSuffixesBuilder.put("min", TimeUnit.MINUTES);
-    timeSuffixesBuilder.put("h", TimeUnit.HOURS);
-    timeSuffixesBuilder.put("d", TimeUnit.DAYS);
-    timeSuffixes = Collections.unmodifiableMap(timeSuffixesBuilder);
-
-    final Map<String, ByteUnit> byteSuffixesBuilder = new HashMap<>();
-    byteSuffixesBuilder.put("b", ByteUnit.BYTE);
-    byteSuffixesBuilder.put("k", ByteUnit.KiB);
-    byteSuffixesBuilder.put("kb", ByteUnit.KiB);
-    byteSuffixesBuilder.put("m", ByteUnit.MiB);
-    byteSuffixesBuilder.put("mb", ByteUnit.MiB);
-    byteSuffixesBuilder.put("g", ByteUnit.GiB);
-    byteSuffixesBuilder.put("gb", ByteUnit.GiB);
-    byteSuffixesBuilder.put("t", ByteUnit.TiB);
-    byteSuffixesBuilder.put("tb", ByteUnit.TiB);
-    byteSuffixesBuilder.put("p", ByteUnit.PiB);
-    byteSuffixesBuilder.put("pb", ByteUnit.PiB);
-    byteSuffixes = Collections.unmodifiableMap(byteSuffixesBuilder);
+    timeSuffixes = Map.of(
+      "us", TimeUnit.MICROSECONDS,
+      "ms", TimeUnit.MILLISECONDS,
+      "s", TimeUnit.SECONDS,
+      "m", TimeUnit.MINUTES,
+      "min", TimeUnit.MINUTES,
+      "h", TimeUnit.HOURS,
+      "d", TimeUnit.DAYS);
+
+    byteSuffixes = Map.ofEntries(
+      Map.entry("b", ByteUnit.BYTE),
+      Map.entry("k", ByteUnit.KiB),
+      Map.entry("kb", ByteUnit.KiB),
+      Map.entry("m", ByteUnit.MiB),
+      Map.entry("mb", ByteUnit.MiB),
+      Map.entry("g", ByteUnit.GiB),
+      Map.entry("gb", ByteUnit.GiB),
+      Map.entry("t", ByteUnit.TiB),
+      Map.entry("tb", ByteUnit.TiB),
+      Map.entry("p", ByteUnit.PiB),
+      Map.entry("pb", ByteUnit.PiB));
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala 
b/core/src/main/scala/org/apache/spark/FutureAction.scala
index 9100d4ce041b..a68700421b8d 100644
--- a/core/src/main/scala/org/apache/spark/FutureAction.scala
+++ b/core/src/main/scala/org/apache/spark/FutureAction.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark
 
-import java.util.Collections
 import java.util.concurrent.TimeUnit
 
 import scala.concurrent._
@@ -255,8 +254,6 @@ private[spark]
 class JavaFutureActionWrapper[S, T](futureAction: FutureAction[S], converter: 
S => T)
   extends JavaFutureAction[T] {
 
-  import scala.jdk.CollectionConverters._
-
   override def isCancelled: Boolean = futureAction.isCancelled
 
   override def isDone: Boolean = {
@@ -266,7 +263,7 @@ class JavaFutureActionWrapper[S, T](futureAction: 
FutureAction[S], converter: S
   }
 
   override def jobIds(): java.util.List[java.lang.Integer] = {
-    
Collections.unmodifiableList(futureAction.jobIds.map(Integer.valueOf).asJava)
+    java.util.List.of(futureAction.jobIds.map(Integer.valueOf): _*)
   }
 
   private def getImpl(timeout: Duration): T = {
diff --git a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala 
b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
index 181033c9d20c..c6d8073a0c2f 100644
--- a/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
+++ b/core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
@@ -19,7 +19,6 @@ package org.apache.spark.util
 
 import java.{lang => jl}
 import java.io.ObjectInputStream
-import java.util.ArrayList
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
 
@@ -505,7 +504,7 @@ class CollectionAccumulator[T] extends AccumulatorV2[T, 
java.util.List[T]] {
   }
 
   override def value: java.util.List[T] = this.synchronized {
-    java.util.Collections.unmodifiableList(new ArrayList[T](getOrCreate))
+    java.util.List.copyOf(getOrCreate)
   }
 
   private[spark] def setValue(newValue: java.util.List[T]): Unit = 
this.synchronized {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
index 4f2b65f36120..6d00048154a5 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -238,9 +237,7 @@ public abstract class SpecificParquetRecordReaderBase<T> 
extends RecordReader<Vo
   private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
     Map<K, Set<V>> setMultiMap = new HashMap<>();
     for (Map.Entry<K, V> entry : map.entrySet()) {
-      Set<V> set = new HashSet<>();
-      set.add(entry.getValue());
-      setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
+      setMultiMap.put(entry.getKey(), Set.of(entry.getValue()));
     }
     return Collections.unmodifiableMap(setMultiMap);
   }
diff --git 
a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java 
b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index b1f743b92196..f8d961fa8dd8 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -704,11 +704,11 @@ public class JavaAPISuite extends 
LocalJavaStreamingContext implements Serializa
       List<List<T>> expected, List<List<T>> actual) {
     List<Set<T>> expectedSets = new ArrayList<>();
     for (List<T> list: expected) {
-      expectedSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
+      expectedSets.add(Set.copyOf(list));
     }
     List<Set<T>> actualSets = new ArrayList<>();
     for (List<T> list: actual) {
-      actualSets.add(Collections.unmodifiableSet(new HashSet<>(list)));
+      actualSets.add(Set.copyOf(list));
     }
     Assertions.assertEquals(expectedSets, actualSets);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to