jerqi commented on code in PR #9194:
URL: https://github.com/apache/iceberg/pull/9194#discussion_r1413426617


##########
core/src/main/java/org/apache/iceberg/util/PartitionMap.java:
##########
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.util;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+
+/**
+ * A map that uses a pair of spec ID and partition tuple as keys.
+ *
+ * <p>This implementation internally stores provided partition tuples in 
{@link StructLikeMap} for
+ * consistent hashing and equals behavior. This ensures that objects of 
different types that
+ * represent the same structs are treated as equal keys in the map.
+ *
+ * <p>Note: This map is not designed for concurrent modification by multiple 
threads. However, it
+ * supports safe concurrent reads, assuming there are no concurrent writes.
+ *
+ * <p>Note: This map does not support null pairs but supports null as 
partition tuples.
+ *
+ * @param <V> the type of values
+ */
+public class PartitionMap<V> implements Map<Pair<Integer, StructLike>, V> {
+
+  private final Map<Integer, PartitionSpec> specs;
+  private final Map<Integer, Map<StructLike, V>> partitionMaps;
+
+  private PartitionMap(Map<Integer, PartitionSpec> specs) {
+    this.specs = specs;
+    this.partitionMaps = Maps.newHashMap();
+  }
+
+  public static <T> PartitionMap<T> create(Map<Integer, PartitionSpec> specs) {
+    return new PartitionMap<>(specs);
+  }
+
+  @Override
+  public int size() {
+    return partitionMaps.values().stream().mapToInt(Map::size).sum();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return partitionMaps.values().stream().allMatch(Map::isEmpty);
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    if (key instanceof Pair) {
+      Object first = ((Pair<?, ?>) key).first();
+      Object second = ((Pair<?, ?>) key).second();
+      if (first instanceof Integer && second instanceof StructLike) {
+        return containsKey((Integer) first, (StructLike) second);
+      }
+    }
+
+    return false;
+  }
+
+  public boolean containsKey(int specId, StructLike struct) {
+    Map<StructLike, V> partitionMap = partitionMaps.get(specId);
+    return partitionMap != null && partitionMap.containsKey(struct);
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    return partitionMaps.values().stream().anyMatch(map -> 
map.containsValue(value));
+  }
+
+  @Override
+  public V get(Object key) {
+    if (key instanceof Pair) {
+      Object first = ((Pair<?, ?>) key).first();
+      Object second = ((Pair<?, ?>) key).second();
+      if (first instanceof Integer && second instanceof StructLike) {
+        return get((Integer) first, (StructLike) second);
+      }
+    }
+
+    return null;
+  }
+
+  public V get(int specId, StructLike struct) {
+    Map<StructLike, V> partitionMap = partitionMaps.get(specId);
+    return partitionMap != null ? partitionMap.get(struct) : null;
+  }
+
+  @Override
+  public V put(Pair<Integer, StructLike> key, V value) {
+    return put(key.first(), key.second(), value);
+  }
+
+  public V put(int specId, StructLike struct, V value) {
+    Map<StructLike, V> partitionMap = partitionMaps.computeIfAbsent(specId, 
this::newPartitionMap);
+    return partitionMap.put(struct, value);
+  }
+
+  @Override
+  public void putAll(Map<? extends Pair<Integer, StructLike>, ? extends V> 
otherMap) {
+    otherMap.forEach(this::put);
+  }
+
+  @Override
+  public V remove(Object key) {
+    if (key instanceof Pair) {
+      Object first = ((Pair<?, ?>) key).first();
+      Object second = ((Pair<?, ?>) key).second();
+      if (first instanceof Integer && second instanceof StructLike) {
+        return removeKey((Integer) first, (StructLike) second);
+      }
+    }
+
+    return null;
+  }
+
+  public V removeKey(int specId, StructLike struct) {
+    Map<StructLike, V> partitionMap = partitionMaps.get(specId);
+    return partitionMap != null ? partitionMap.remove(struct) : null;
+  }
+
+  @Override
+  public void clear() {
+    partitionMaps.clear();
+  }
+
+  @Override
+  public Set<Pair<Integer, StructLike>> keySet() {
+    PartitionSet keySet = PartitionSet.create(specs);
+
+    for (Entry<Integer, Map<StructLike, V>> specIdAndPartitionMap : 
partitionMaps.entrySet()) {
+      int specId = specIdAndPartitionMap.getKey();
+      Map<StructLike, V> partitionMap = specIdAndPartitionMap.getValue();
+      for (StructLike partition : partitionMap.keySet()) {
+        keySet.add(specId, partition);
+      }
+    }
+
+    return Collections.unmodifiableSet(keySet);
+  }
+
+  @Override
+  public Collection<V> values() {
+    List<V> values = Lists.newArrayList();
+
+    for (Map<StructLike, V> partitionMap : partitionMaps.values()) {
+      values.addAll(partitionMap.values());
+    }
+
+    return Collections.unmodifiableCollection(values);
+  }
+
+  @Override
+  public Set<Entry<Pair<Integer, StructLike>, V>> entrySet() {
+    Set<Entry<Pair<Integer, StructLike>, V>> entrySet = Sets.newHashSet();
+
+    for (Entry<Integer, Map<StructLike, V>> specIdAndPartitionMap : 
partitionMaps.entrySet()) {
+      int specId = specIdAndPartitionMap.getKey();
+      Map<StructLike, V> partitionMap = specIdAndPartitionMap.getValue();
+      for (Entry<StructLike, V> structAndValue : partitionMap.entrySet()) {
+        StructLike struct = structAndValue.getKey();
+        V value = structAndValue.getValue();
+        entrySet.add(new PartitionEntry<>(specId, struct, value));
+      }
+    }
+
+    return Collections.unmodifiableSet(entrySet);
+  }
+
+  public V computeIfAbsent(int specId, StructLike struct, Supplier<V> 
valueSupplier) {
+    Map<StructLike, V> partitionMap = partitionMaps.computeIfAbsent(specId, 
this::newPartitionMap);
+    return partitionMap.computeIfAbsent(struct, key -> valueSupplier.get());
+  }
+
+  private Map<StructLike, V> newPartitionMap(int specId) {
+    PartitionSpec spec = specs.get(specId);

Review Comment:
   What will it happen if specs don't contain specId?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to