Github user kiszk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/23124#discussion_r235952965
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.catalyst.util
    +
    +import scala.collection.mutable
    +
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
    +
    +/**
    + * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
    + * duplicated map keys w.r.t. the last wins policy.
    + */
    +class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
    +  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
    +
    +  private lazy val keyToIndex = keyType match {
    +    case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
    +    case _ =>
    +      // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
    +      // data, e.g. UnsafeRow vs GenericInternalRow.
    +      mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
    +  }
    +
    +  // TODO: specialize it
    +  private lazy val keys = mutable.ArrayBuffer.empty[Any]
    +  private lazy val values = mutable.ArrayBuffer.empty[Any]
    +
    +  private lazy val keyGetter = InternalRow.getAccessor(keyType)
    +  private lazy val valueGetter = InternalRow.getAccessor(valueType)
    +
    +  def reset(): Unit = {
    +    keyToIndex.clear()
    +    keys.clear()
    +    values.clear()
    +  }
    +
    +  def put(key: Any, value: Any): Unit = {
    +    if (key == null) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +
    +    val maybeExistingIdx = keyToIndex.get(key)
    +    if (maybeExistingIdx.isDefined) {
    +      // Overwrite the previous value, as the policy is last wins.
    +      values(maybeExistingIdx.get) = value
    +    } else {
    +      keyToIndex.put(key, values.length)
    +      keys.append(key)
    +      values.append(value)
    +    }
    +  }
    +
    +  // write a 2-field row, the first field is key and the second field is 
value.
    +  def put(entry: InternalRow): Unit = {
    +    if (entry.isNullAt(0)) {
    +      throw new RuntimeException("Cannot use null as map key.")
    +    }
    +    put(keyGetter(entry, 0), valueGetter(entry, 1))
    +  }
    +
    +  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
    +    if (keyArray.length != valueArray.length) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same 
length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.length) {
    +      put(keyArray(i), valueArray(i))
    +      i += 1
    +    }
    +  }
    +
    +  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
    +    if (keyArray.numElements() != valueArray.numElements()) {
    +      throw new RuntimeException(
    +        "The key array and value array of MapData must have the same 
length.")
    +    }
    +
    +    var i = 0
    +    while (i < keyArray.numElements()) {
    +      put(keyGetter(keyArray, i), valueGetter(valueArray, i))
    +      i += 1
    +    }
    +  }
    +
    +  def build(): ArrayBasedMapData = {
    +    new ArrayBasedMapData(new GenericArrayData(keys.toArray), new 
GenericArrayData(values.toArray))
    --- End diff --
    
    
    
    Is it better to call reset() after calling new ArrayBasedMapData to reduce 
memory consumption in Java heap?
    
    At caller side, ArrayBasedMapBuilder is not released. Therefore, until 
reset() will be called next time, each ArrayBasedMapBuilder keeps unused data 
in keys, values, and keyToIndex. They consumes Java heap unexpectedly.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to