[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-12-09 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r240104815
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -27,6 +27,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, float/double -0.0 is semantically 
equal to 0.0, but users can still distinguish them via `Dataset.show`, 
`Dataset.collect` etc. Since Spark 3.0, float/double -0.0 is replaced by 0.0 
internally, and users can't distinguish them any more.
 
+  - In Spark version 2.4 and earlier, users can create a map with 
duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. 
The behavior of map with duplicated keys is undefined, e.g. map look up 
respects the duplicated key appears first, `Dataset.collect` only keeps the 
duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since 
Spark 3.0, these built-in functions will remove duplicated map keys with last 
wins policy. Users may still read map values with duplicated keys from data 
sources which do not enforce it (e.g. Parquet), the behavior will be udefined.
--- End diff --

A few typos. How about?

```
In Spark version 2.4 and earlier, users can create a map with duplicate 
keys via built-in functions like `CreateMap` and `StringToMap`. The behavior of 
map with duplicate keys is undefined. For example, the map lookup respects the 
duplicate key that appears first, `Dataset.collect` only keeps the duplicate 
key that appears last, and `MapKeys` returns duplicate keys. Since Spark 3.0, 
these built-in functions will remove duplicate map keys using the last-one-wins 
policy. Users may still read map values with duplicate keys from the data 
sources that do not enforce it (e.g. Parquet), but the behavior will be 
undefined.
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-28 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/23124


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-28 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236973308
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapData.scala
 ---
@@ -19,6 +19,12 @@ package org.apache.spark.sql.catalyst.util
 
 import java.util.{Map => JavaMap}
 
+/**
+ * A simple `MapData` implementation which is backed by 2 arrays.
+ *
+ * Note that, user is responsible to guarantee that the key array does not 
have duplicated
+ * elements, otherwise the behavior is undefined.
--- End diff --

nit: we might need to add the same note to the 3rd and 4th 
`ArrayBasedMapData.apply()` method.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236962499
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+  assert(keyType != NullType, "map key cannot be null type.")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
--- End diff --

I think for performance critical code path we should prefer java 
collection. thanks for pointing it out!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236955791
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+  assert(keyType != NullType, "map key cannot be null type.")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
--- End diff --

We need to exempt `BinaryType` from `AtomicType` here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-27 Thread ueshin
Github user ueshin commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236958252
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilderSuite.scala
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{UnsafeArrayData, 
UnsafeRow}
+import org.apache.spark.sql.types.{ArrayType, IntegerType, StructType}
+import org.apache.spark.unsafe.Platform
+
+class ArrayBasedMapBuilderSuite extends SparkFunSuite {
+
+  test("basic") {
+val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
+builder.put(1, 1)
+builder.put(InternalRow(2, 2))
+builder.putAll(new GenericArrayData(Seq(3)), new 
GenericArrayData(Seq(3)))
+val map = builder.build()
+assert(map.numElements() == 3)
+assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 1, 2 -> 2, 3 -> 
3))
+  }
+
+  test("fail with null key") {
+val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
+builder.put(1, null) // null value is OK
+val e = intercept[RuntimeException](builder.put(null, 1))
+assert(e.getMessage.contains("Cannot use null as map key"))
+  }
+
+  test("remove duplicated keys with last wins policy") {
+val builder = new ArrayBasedMapBuilder(IntegerType, IntegerType)
+builder.put(1, 1)
+builder.put(2, 2)
+builder.put(1, 2)
+val map = builder.build()
+assert(map.numElements() == 2)
+assert(ArrayBasedMapData.toScalaMap(map) == Map(1 -> 2, 2 -> 2))
+  }
+
+  test("struct type key") {
+val builder = new ArrayBasedMapBuilder(new StructType().add("i", 
"int"), IntegerType)
+builder.put(InternalRow(1), 1)
+builder.put(InternalRow(2), 2)
+val unsafeRow = {
+  val row = new UnsafeRow(1)
+  val bytes = new Array[Byte](16)
+  row.pointTo(bytes, 16)
+  row.setInt(0, 1)
+  row
+}
+builder.put(unsafeRow, 3)
+val map = builder.build()
+assert(map.numElements() == 2)
+assert(ArrayBasedMapData.toScalaMap(map) == Map(InternalRow(1) -> 3, 
InternalRow(2) -> 2))
+  }
+
+  test("array type key") {
+val builder = new ArrayBasedMapBuilder(ArrayType(IntegerType), 
IntegerType)
+builder.put(new GenericArrayData(Seq(1, 1)), 1)
+builder.put(new GenericArrayData(Seq(2, 2)), 2)
+val unsafeArray = {
+  val array = new UnsafeArrayData()
+  val bytes = new Array[Byte](24)
+  Platform.putLong(bytes, Platform.BYTE_ARRAY_OFFSET, 2)
+  array.pointTo(bytes, Platform.BYTE_ARRAY_OFFSET, 24)
+  array.setInt(0, 1)
+  array.setInt(1, 1)
+  array
+}
+builder.put(unsafeArray, 3)
+val map = builder.build()
+assert(map.numElements() == 2)
+assert(ArrayBasedMapData.toScalaMap(map) ==
+  Map(new GenericArrayData(Seq(1, 1)) -> 3, new 
GenericArrayData(Seq(2, 2)) -> 2))
+  }
--- End diff --

We should have a binary type key test as well?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-27 Thread bersprockets
Github user bersprockets commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236952729
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types._
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+  assert(keyType != NullType, "map key cannot be null type.")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
--- End diff --

FYI: I had a test lying around from when I worked on map_concat. With this 
PR:

- map_concat of two small maps (20 string keys per map, no dups) for 2M 
rows is about 17% slower.
- map_concat of two big maps (500 string keys per map, no dups) for 1M rows 
is about 25% slower.

The baseline code is the same branch as the PR, but without the 4 commits.

Some cost makes sense, as we're checking for dups, but it's odd that the 
overhead grows disproportionately as the size of the maps grows.


I remember that at one time, mutable.HashMap had some performance issues 
(rumor has it, anyway). So as a test, I modified ArrayBasedMapBuilder.scala to 
use java.util.Hashmap instead. After that:

- map_concat of two small maps (20 string keys per map, no dups) for 2M 
rows is about 12% slower.
- map_concat of two big maps (500 string keys per map, no dups) for 1M rows 
is about 15% slower.

It's a little more proportionate. I don't know if switching HashMap 
implementations would have some negative consequences.

Also, my test is a dumb benchmark that uses System.currentTimeMillis 
concatenating simple [String,Integer] maps.





---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-27 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236949897
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 ---
@@ -558,8 +558,11 @@ private[parquet] class ParquetRowConverter(
 
 override def getConverter(fieldIndex: Int): Converter = 
keyValueConverter
 
-override def end(): Unit =
+override def end(): Unit = {
+  // The parquet map may contains null or duplicated map keys. When it 
happens, the behavior is
+  // undefined.
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236376102
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala ---
@@ -89,7 +89,7 @@ class DataFrameFunctionsSuite extends QueryTest with 
SharedSQLContext {
 val msg1 = intercept[Exception] {
   df5.select(map_from_arrays($"k", $"v")).collect
 }.getMessage
-assert(msg1.contains("Cannot use null as map key!"))
+assert(msg1.contains("Cannot use null as map key"))
--- End diff --

Message at Line 98 is also changed now.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-26 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236284636
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+put(keyGetter(entry, 0), valueGetter(entry, 1))
+  }
+
+  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
+if (keyArray.length != valueArray.length) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.length) {
+  put(keyArray(i), valueArray(i))
+  i += 1
+}
+  }
+
+  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
+if (keyArray.numElements() != valueArray.numElements()) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.numElements()) {
+  put(keyGetter(keyArray, i), valueGetter(valueArray, i))
+  i += 1
+}
+  }
+
+  def build(): ArrayBasedMapData = {
+new ArrayBasedMapData(new GenericArrayData(keys.toArray), new 
GenericArrayData(values.toArray))
+  }
+
+  def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData 
= {
+assert(keyToIndex.isEmpty, "'from' can only be called with a fresh 
GenericMapBuilder.")
+putAll(keyArray, valueArray)
--- End diff --

Ah, you are right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236282401
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+put(keyGetter(entry, 0), valueGetter(entry, 1))
+  }
+
+  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
+if (keyArray.length != valueArray.length) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.length) {
+  put(keyArray(i), valueArray(i))
+  i += 1
+}
+  }
+
+  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
+if (keyArray.numElements() != valueArray.numElements()) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.numElements()) {
+  put(keyGetter(keyArray, i), valueGetter(valueArray, i))
+  i += 1
+}
+  }
+
+  def build(): ArrayBasedMapData = {
+new ArrayBasedMapData(new GenericArrayData(keys.toArray), new 
GenericArrayData(values.toArray))
+  }
+
+  def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData 
= {
+assert(keyToIndex.isEmpty, "'from' can only be called with a fresh 
GenericMapBuilder.")
+putAll(keyArray, valueArray)
--- End diff --

no we can't, as we still need to detect duplicated keys.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236275822
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override def eval(input: InternalRow): Any = {
-val maps = children.map(_.eval(input))
+val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
--- End diff --

`toArray` is O(N) but we do it only once. If accessing by index is O(N), 
the total time complexity is O(N ^ 2).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-26 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236274428
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -19,6 +19,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, users can create map values with map 
type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since 
Spark 3.0, it's not allowed to create map values with map type key with these 
built-in functions. Users can still read map values with map type key from data 
source or Java/Scala collections, though they are not very useful.
 
+  - In Spark version 2.4 and earlier, users can create a map with 
duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. 
The behavior of map with duplicated keys is undefined, e.g. map look up 
respects the duplicated key appears first, `Dataset.collect` only keeps the 
duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since 
Spark 3.0, these built-in functions will remove duplicated map keys with last 
wins policy.
--- End diff --

They are related, but they are not the same. For example, we don't support 
map type as key, because we can't check equality of map type correctly. This is 
just a current implementation limitation, and we may relax it in the future.

Duplicated map keys is a real problem and we will never allow it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236171035
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
 val mapMerge =
   s"""
-|${ev.isNull} = $hasNullName;
-|if (!${ev.isNull}) {
-|  $arrayDataClass[] $keyArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  $arrayDataClass[] $valArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  long $numElementsName = 0;
-|  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) 
{
-|$keyArgsName[$idxName] = $argsName[$idxName].keyArray();
-|$valArgsName[$idxName] = $argsName[$idxName].valueArray();
-|$numElementsName += $argsName[$idxName].numElements();
-|  }
-|  if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
-|throw new RuntimeException("Unsuccessful attempt to concat 
maps with " +
-|   $numElementsName + " elements due to exceeding the map 
size limit " +
-|   "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
-|  }
-|  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
-|(int) $numElementsName);
-|  $arrayDataClass $finValsName = $valueConcat($valArgsName,
-|(int) $numElementsName);
-|  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, 
$finValsName);
+|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
+|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
+|long $numElementsName = 0;
+|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
+|  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
+|  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
+|  $numElementsName += $argsName[$idxName].numElements();
 |}
+|if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
--- End diff --

I see, I agree doing it in a followup, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-26 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236170759
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override def eval(input: InternalRow): Any = {
-val maps = children.map(_.eval(input))
+val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
--- End diff --

Yes, but converting `toArray` may require an extra O(N) operation for the 
copy, so I am not sure the difference between `while` and `foreach` is 
significant enough to cover the overhead of the copy...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236112390
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
 val mapMerge =
   s"""
-|${ev.isNull} = $hasNullName;
-|if (!${ev.isNull}) {
-|  $arrayDataClass[] $keyArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  $arrayDataClass[] $valArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  long $numElementsName = 0;
-|  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) 
{
-|$keyArgsName[$idxName] = $argsName[$idxName].keyArray();
-|$valArgsName[$idxName] = $argsName[$idxName].valueArray();
-|$numElementsName += $argsName[$idxName].numElements();
-|  }
-|  if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
-|throw new RuntimeException("Unsuccessful attempt to concat 
maps with " +
-|   $numElementsName + " elements due to exceeding the map 
size limit " +
-|   "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
-|  }
-|  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
-|(int) $numElementsName);
-|  $arrayDataClass $finValsName = $valueConcat($valArgsName,
-|(int) $numElementsName);
-|  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, 
$finValsName);
+|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
+|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
+|long $numElementsName = 0;
+|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
+|  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
+|  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
+|  $numElementsName += $argsName[$idxName].numElements();
 |}
+|if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
--- End diff --

yup. I actually did what you proposed at first, and then realized it's 
different from before and may introduce perf regression. We can investigate it 
in a followup.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236112256
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override def eval(input: InternalRow): Any = {
-val maps = children.map(_.eval(input))
+val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
--- End diff --

BTW, if it's not true anymore with scala 2.12, we should update them 
together with a bechmark, instead of only updating this single one.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-25 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r236112177
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override def eval(input: InternalRow): Any = {
-val maps = children.map(_.eval(input))
+val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
--- End diff --

in scala, while loop is faster than `foreach`. If you look at 
`Expression.eval` implementations, we use while loop a lot even `foreach` can 
produce simpler code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235999222
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
--- End diff --

After merging this PR, I'll check again and file a JIRA for that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235999040
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
--- End diff --

Sure.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235952965
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+put(keyGetter(entry, 0), valueGetter(entry, 1))
+  }
+
+  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
+if (keyArray.length != valueArray.length) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.length) {
+  put(keyArray(i), valueArray(i))
+  i += 1
+}
+  }
+
+  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
+if (keyArray.numElements() != valueArray.numElements()) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.numElements()) {
+  put(keyGetter(keyArray, i), valueGetter(valueArray, i))
+  i += 1
+}
+  }
+
+  def build(): ArrayBasedMapData = {
+new ArrayBasedMapData(new GenericArrayData(keys.toArray), new 
GenericArrayData(values.toArray))
--- End diff --



Is it better to call reset() after calling new ArrayBasedMapData to reduce 
memory consumption in Java heap?

At caller side, ArrayBasedMapBuilder is not released. Therefore, until 
reset() will be called next time, each ArrayBasedMapBuilder keeps unused data 
in keys, values, and keyToIndex. They consumes Java heap unexpectedly.



---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235950666
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+put(keyGetter(entry, 0), valueGetter(entry, 1))
+  }
+
+  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
+if (keyArray.length != valueArray.length) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.length) {
+  put(keyArray(i), valueArray(i))
+  i += 1
+}
+  }
+
+  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
+if (keyArray.numElements() != valueArray.numElements()) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.numElements()) {
+  put(keyGetter(keyArray, i), valueGetter(valueArray, i))
+  i += 1
+}
+  }
+
+  def build(): ArrayBasedMapData = {
+new ArrayBasedMapData(new GenericArrayData(keys.toArray), new 
GenericArrayData(values.toArray))
+  }
+
+  def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData 
= {
+assert(keyToIndex.isEmpty, "'from' can only be called with a fresh 
GenericMapBuilder.")
+putAll(keyArray, valueArray)
+if (keyToIndex.size == keyArray.numElements()) {
+  // If there is no duplicated map keys, creates the MapData with the 
input key and value array,
+  // as they might already in unsafe format and are more efficient.
+  new 

[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235950148
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+put(keyGetter(entry, 0), valueGetter(entry, 1))
+  }
+
+  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
+if (keyArray.length != valueArray.length) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.length) {
+  put(keyArray(i), valueArray(i))
+  i += 1
+}
+  }
+
+  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
+if (keyArray.numElements() != valueArray.numElements()) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.numElements()) {
+  put(keyGetter(keyArray, i), valueGetter(valueArray, i))
+  i += 1
+}
+  }
+
+  def build(): ArrayBasedMapData = {
+new ArrayBasedMapData(new GenericArrayData(keys.toArray), new 
GenericArrayData(values.toArray))
+  }
--- End diff --

Is it better to call `reset()` after calling `new ArrayBasedMapData` to 
reduce memory consumption?

At caller side, `ArrayBasedMapBuilder` is not released. Therefore, until 
reset() will be called next time, each `ArrayBasedMapBuilder` keeps unused data 
in `keys`, `values`, and `keyToIndex`. They consumes Java heap unexpectedly.


---

-
To unsubscribe, e-mail: 

[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235947044
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+put(keyGetter(entry, 0), valueGetter(entry, 1))
+  }
+
+  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
+if (keyArray.length != valueArray.length) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.length) {
+  put(keyArray(i), valueArray(i))
+  i += 1
+}
+  }
+
+  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
+if (keyArray.numElements() != valueArray.numElements()) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.numElements()) {
+  put(keyGetter(keyArray, i), valueGetter(valueArray, i))
+  i += 1
+}
+  }
+
+  def build(): ArrayBasedMapData = {
+new ArrayBasedMapData(new GenericArrayData(keys.toArray), new 
GenericArrayData(values.toArray))
+  }
+
+  def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData 
= {
+assert(keyToIndex.isEmpty, "'from' can only be called with a fresh 
GenericMapBuilder.")
+putAll(keyArray, valueArray)
--- End diff --

Can we call `new ArrayBasedMapData(keyArray, valueArray)` without calling 
`putAll(keyArray, valueArray)` if 
`keyArray.asInstanceOf[ArrayData].containsNull` is false?


---


[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235943290
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -751,171 +739,46 @@ case class MapFromEntries(child: Expression) extends 
UnaryExpression {
   s"${child.dataType.catalogString} type. $prettyName accepts only 
arrays of pair structs.")
   }
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override protected def nullSafeEval(input: Any): Any = {
-val arrayData = input.asInstanceOf[ArrayData]
-val numEntries = arrayData.numElements()
+val entries = input.asInstanceOf[ArrayData]
+val numEntries = entries.numElements()
 var i = 0
-if(nullEntries) {
+if (nullEntries) {
   while (i < numEntries) {
-if (arrayData.isNullAt(i)) return null
+if (entries.isNullAt(i)) return null
 i += 1
   }
 }
-val keyArray = new Array[AnyRef](numEntries)
-val valueArray = new Array[AnyRef](numEntries)
+
+mapBuilder.reset()
 i = 0
 while (i < numEntries) {
-  val entry = arrayData.getStruct(i, 2)
-  val key = entry.get(0, dataType.keyType)
-  if (key == null) {
-throw new RuntimeException("The first field from a struct (key) 
can't be null.")
-  }
-  keyArray.update(i, key)
-  val value = entry.get(1, dataType.valueType)
-  valueArray.update(i, value)
+  mapBuilder.put(entries.getStruct(i, 2))
   i += 1
 }
-ArrayBasedMapData(keyArray, valueArray)
+mapBuilder.build()
   }
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
 nullSafeCodeGen(ctx, ev, c => {
   val numEntries = ctx.freshName("numEntries")
-  val isKeyPrimitive = CodeGenerator.isPrimitiveType(dataType.keyType)
-  val isValuePrimitive = 
CodeGenerator.isPrimitiveType(dataType.valueType)
-  val code = if (isKeyPrimitive && isValuePrimitive) {
-genCodeForPrimitiveElements(ctx, c, ev.value, numEntries)
--- End diff --

This change allow us to focus on optimizing `ArrayBasedMapBuilder`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235932502
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override def eval(input: InternalRow): Any = {
-val maps = children.map(_.eval(input))
+val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
--- End diff --

well, my understanding is that we could do a `maps.foreach` instead of 
accessing them by index. I don't see the index to be significant at all, but 
maybe I am missing something...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235931894
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
 val mapMerge =
   s"""
-|${ev.isNull} = $hasNullName;
-|if (!${ev.isNull}) {
-|  $arrayDataClass[] $keyArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  $arrayDataClass[] $valArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  long $numElementsName = 0;
-|  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) 
{
-|$keyArgsName[$idxName] = $argsName[$idxName].keyArray();
-|$valArgsName[$idxName] = $argsName[$idxName].valueArray();
-|$numElementsName += $argsName[$idxName].numElements();
-|  }
-|  if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
-|throw new RuntimeException("Unsuccessful attempt to concat 
maps with " +
-|   $numElementsName + " elements due to exceeding the map 
size limit " +
-|   "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
-|  }
-|  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
-|(int) $numElementsName);
-|  $arrayDataClass $finValsName = $valueConcat($valArgsName,
-|(int) $numElementsName);
-|  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, 
$finValsName);
+|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
+|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
+|long $numElementsName = 0;
+|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
+|  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
+|  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
+|  $numElementsName += $argsName[$idxName].numElements();
 |}
+|if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
--- End diff --

yes, but we could do the putAll before and eventually fail when we reach 
the limit. We can maybe do that in a followup, though, as it is not introducing 
any regression..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235931588
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
--- End diff --

Oh I see now, I missed it, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235929748
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
--- End diff --

I think we should fail it at analyzer phase, and other map-producing 
functions should do it as well. Can you create a JIRA for it? thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235929210
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+put(keyGetter(entry, 0), valueGetter(entry, 1))
+  }
+
+  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
--- End diff --

ah good catch!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235928954
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
--- End diff --

There are 2 put methods have this null check and other put methods all go 
through them.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235928588
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
 val mapMerge =
   s"""
-|${ev.isNull} = $hasNullName;
-|if (!${ev.isNull}) {
-|  $arrayDataClass[] $keyArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  $arrayDataClass[] $valArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  long $numElementsName = 0;
-|  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) 
{
-|$keyArgsName[$idxName] = $argsName[$idxName].keyArray();
-|$valArgsName[$idxName] = $argsName[$idxName].valueArray();
-|$numElementsName += $argsName[$idxName].numElements();
-|  }
-|  if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
-|throw new RuntimeException("Unsuccessful attempt to concat 
maps with " +
-|   $numElementsName + " elements due to exceeding the map 
size limit " +
-|   "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
-|  }
-|  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
-|(int) $numElementsName);
-|  $arrayDataClass $finValsName = $valueConcat($valArgsName,
-|(int) $numElementsName);
-|  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, 
$finValsName);
+|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
+|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
+|long $numElementsName = 0;
+|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
+|  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
+|  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
+|  $numElementsName += $argsName[$idxName].numElements();
 |}
+|if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
--- End diff --

This check is done before the `putAll`, so that it can fail fast. I think 
it's fine to ignore duplicated keys here, to make it a more conservative. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235927895
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override def eval(input: InternalRow): Any = {
-val maps = children.map(_.eval(input))
+val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
--- End diff --

I need to access it by index below, turn it to array so that the access is 
guaranteed to be O(1).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235879585
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+put(keyGetter(entry, 0), valueGetter(entry, 1))
+  }
+
+  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
--- End diff --

Has this method been used? Looks like only another `putAll` below is used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235872502
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -19,6 +19,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, users can create map values with map 
type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since 
Spark 3.0, it's not allowed to create map values with map type key with these 
built-in functions. Users can still read map values with map type key from data 
source or Java/Scala collections, though they are not very useful.
 
+  - In Spark version 2.4 and earlier, users can create a map with 
duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. 
The behavior of map with duplicated keys is undefined, e.g. map look up 
respects the duplicated key appears first, `Dataset.collect` only keeps the 
duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since 
Spark 3.0, these built-in functions will remove duplicated map keys with last 
wins policy.
--- End diff --

Similar as above, shall we also mention data source can have be read with 
duplicated map keys?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235870945
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+put(keyGetter(entry, 0), valueGetter(entry, 1))
+  }
+
+  def putAll(keyArray: Array[Any], valueArray: Array[Any]): Unit = {
+if (keyArray.length != valueArray.length) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.length) {
+  put(keyArray(i), valueArray(i))
+  i += 1
+}
+  }
+
+  def putAll(keyArray: ArrayData, valueArray: ArrayData): Unit = {
+if (keyArray.numElements() != valueArray.numElements()) {
+  throw new RuntimeException(
+"The key array and value array of MapData must have the same 
length.")
+}
+
+var i = 0
+while (i < keyArray.numElements()) {
+  put(keyGetter(keyArray, i), valueGetter(valueArray, i))
+  i += 1
+}
+  }
+
+  def build(): ArrayBasedMapData = {
+new ArrayBasedMapData(new GenericArrayData(keys.toArray), new 
GenericArrayData(values.toArray))
+  }
+
+  def from(keyArray: ArrayData, valueArray: ArrayData): ArrayBasedMapData 
= {
+assert(keyToIndex.isEmpty, "'from' can only be called with a fresh 
GenericMapBuilder.")
--- End diff --

`GenericMapBuilder`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235866111
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -646,34 +633,35 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
 val mapMerge =
   s"""
-|${ev.isNull} = $hasNullName;
-|if (!${ev.isNull}) {
-|  $arrayDataClass[] $keyArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  $arrayDataClass[] $valArgsName = new 
$arrayDataClass[${mapCodes.size}];
-|  long $numElementsName = 0;
-|  for (int $idxName = 0; $idxName < $argsName.length; $idxName++) 
{
-|$keyArgsName[$idxName] = $argsName[$idxName].keyArray();
-|$valArgsName[$idxName] = $argsName[$idxName].valueArray();
-|$numElementsName += $argsName[$idxName].numElements();
-|  }
-|  if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
-|throw new RuntimeException("Unsuccessful attempt to concat 
maps with " +
-|   $numElementsName + " elements due to exceeding the map 
size limit " +
-|   "${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}.");
-|  }
-|  $arrayDataClass $finKeysName = $keyConcat($keyArgsName,
-|(int) $numElementsName);
-|  $arrayDataClass $finValsName = $valueConcat($valArgsName,
-|(int) $numElementsName);
-|  ${ev.value} = new $arrayBasedMapDataClass($finKeysName, 
$finValsName);
+|ArrayData[] $keyArgsName = new ArrayData[${mapCodes.size}];
+|ArrayData[] $valArgsName = new ArrayData[${mapCodes.size}];
+|long $numElementsName = 0;
+|for (int $idxName = 0; $idxName < $argsName.length; $idxName++) {
+|  $keyArgsName[$idxName] = $argsName[$idxName].keyArray();
+|  $valArgsName[$idxName] = $argsName[$idxName].valueArray();
+|  $numElementsName += $argsName[$idxName].numElements();
 |}
+|if ($numElementsName > 
${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH}) {
--- End diff --

this check is not really correct, as we are not considering duplicates 
IIUC. I think we can change this behavior using `putAll` and checking the size 
in the loop.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235865179
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -546,33 +546,29 @@ case class MapConcat(children: Seq[Expression]) 
extends ComplexTypeMergingExpres
 
   override def nullable: Boolean = children.exists(_.nullable)
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override def eval(input: InternalRow): Any = {
-val maps = children.map(_.eval(input))
+val maps = children.map(_.eval(input).asInstanceOf[MapData]).toArray
--- End diff --

why do we need `toArray` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235867070
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
+  }
+
+  // TODO: specialize it
+  private lazy val keys = mutable.ArrayBuffer.empty[Any]
+  private lazy val values = mutable.ArrayBuffer.empty[Any]
+
+  private lazy val keyGetter = InternalRow.getAccessor(keyType)
+  private lazy val valueGetter = InternalRow.getAccessor(valueType)
+
+  def reset(): Unit = {
+keyToIndex.clear()
+keys.clear()
+values.clear()
+  }
+
+  def put(key: Any, value: Any): Unit = {
+if (key == null) {
+  throw new RuntimeException("Cannot use null as map key.")
+}
+
+val maybeExistingIdx = keyToIndex.get(key)
+if (maybeExistingIdx.isDefined) {
+  // Overwrite the previous value, as the policy is last wins.
+  values(maybeExistingIdx.get) = value
+} else {
+  keyToIndex.put(key, values.length)
+  keys.append(key)
+  values.append(value)
+}
+  }
+
+  // write a 2-field row, the first field is key and the second field is 
value.
+  def put(entry: InternalRow): Unit = {
+if (entry.isNullAt(0)) {
--- End diff --

this is checked only here and not in all the other put...I think we should 
be consistent and either check it always or never do it..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-22 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235852779
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
+
+  private lazy val keyToIndex = keyType match {
+case _: AtomicType | _: CalendarIntervalType => 
mutable.HashMap.empty[Any, Int]
+case _ =>
+  // for complex types, use interpreted ordering to be able to compare 
unsafe data with safe
+  // data, e.g. UnsafeRow vs GenericInternalRow.
+  mutable.TreeMap.empty[Any, 
Int](TypeUtils.getInterpretedOrdering(keyType))
--- End diff --

```scala
scala> sql("select map(null,2)")
res1: org.apache.spark.sql.DataFrame = [map(NULL, 2): map]

scala> sql("select map(null,2)").collect
scala.MatchError: NullType (of class org.apache.spark.sql.types.NullType$)
  at 
org.apache.spark.sql.catalyst.util.TypeUtils$.getInterpretedOrdering(TypeUtils.scala:67)
  at 
org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.keyToIndex$lzycompute(ArrayBasedMapBuilder.scala:37)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-22 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235851923
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 ---
@@ -558,8 +558,11 @@ private[parquet] class ParquetRowConverter(
 
 override def getConverter(fieldIndex: Int): Converter = 
keyValueConverter
 
-override def end(): Unit =
+override def end(): Unit = {
+  // The parquet map may contains null or duplicated map keys. When it 
happens, the behavior is
+  // undefined.
--- End diff --

What about creating a Spark JIRA issue for this and embedded that ID here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-22 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235851798
  
--- Diff: docs/sql-migration-guide-upgrade.md ---
@@ -19,6 +19,8 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, users can create map values with map 
type key via built-in function like `CreateMap`, `MapFromArrays`, etc. Since 
Spark 3.0, it's not allowed to create map values with map type key with these 
built-in functions. Users can still read map values with map type key from data 
source or Java/Scala collections, though they are not very useful.
 
+  - In Spark version 2.4 and earlier, users can create a map with 
duplicated keys via built-in functions like `CreateMap`, `StringToMap`, etc. 
The behavior of map with duplicated keys is undefined, e.g. map look up 
respects the duplicated key appears first, `Dataset.collect` only keeps the 
duplicated key appears last, `MapKeys` returns duplicated keys, etc. Since 
Spark 3.0, these built-in functions will remove duplicated map keys with last 
wins policy.
--- End diff --

Can we merge this with the above sentence at line 20? Both are different, 
but are related very strongly.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-22 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235851554
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ArrayBasedMapBuilder.scala
 ---
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.util
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{AtomicType, CalendarIntervalType, 
DataType, MapType}
+
+/**
+ * A builder of [[ArrayBasedMapData]], which fails if a null map key is 
detected, and removes
+ * duplicated map keys w.r.t. the last wins policy.
+ */
+class ArrayBasedMapBuilder(keyType: DataType, valueType: DataType) extends 
Serializable {
+  assert(!keyType.existsRecursively(_.isInstanceOf[MapType]), "key of map 
cannot be/contain map")
--- End diff --

Shall we add assert to prevent `NullType` here, too?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235849825
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala
 ---
@@ -751,171 +739,46 @@ case class MapFromEntries(child: Expression) extends 
UnaryExpression {
   s"${child.dataType.catalogString} type. $prettyName accepts only 
arrays of pair structs.")
   }
 
+  private lazy val mapBuilder = new ArrayBasedMapBuilder(dataType.keyType, 
dataType.valueType)
+
   override protected def nullSafeEval(input: Any): Any = {
-val arrayData = input.asInstanceOf[ArrayData]
-val numEntries = arrayData.numElements()
+val entries = input.asInstanceOf[ArrayData]
+val numEntries = entries.numElements()
 var i = 0
-if(nullEntries) {
+if (nullEntries) {
   while (i < numEntries) {
-if (arrayData.isNullAt(i)) return null
+if (entries.isNullAt(i)) return null
 i += 1
   }
 }
-val keyArray = new Array[AnyRef](numEntries)
-val valueArray = new Array[AnyRef](numEntries)
+
+mapBuilder.reset()
 i = 0
 while (i < numEntries) {
-  val entry = arrayData.getStruct(i, 2)
-  val key = entry.get(0, dataType.keyType)
-  if (key == null) {
-throw new RuntimeException("The first field from a struct (key) 
can't be null.")
-  }
-  keyArray.update(i, key)
-  val value = entry.get(1, dataType.valueType)
-  valueArray.update(i, value)
+  mapBuilder.put(entries.getStruct(i, 2))
   i += 1
 }
-ArrayBasedMapData(keyArray, valueArray)
+mapBuilder.build()
   }
 
   override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): 
ExprCode = {
 nullSafeCodeGen(ctx, ev, c => {
   val numEntries = ctx.freshName("numEntries")
-  val isKeyPrimitive = CodeGenerator.isPrimitiveType(dataType.keyType)
-  val isValuePrimitive = 
CodeGenerator.isPrimitiveType(dataType.valueType)
-  val code = if (isKeyPrimitive && isValuePrimitive) {
-genCodeForPrimitiveElements(ctx, c, ev.value, numEntries)
--- End diff --

since we need to check duplicated map keys, it's not possible to apply this 
trick anymore, as we need to overwrite values if the key appears before.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-22 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/23124#discussion_r235849697
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/InternalRow.scala ---
@@ -125,22 +125,36 @@ object InternalRow {
* actually takes a `SpecializedGetters` input because it can be 
generalized to other classes
* that implements `SpecializedGetters` (e.g., `ArrayData`) too.
*/
-  def getAccessor(dataType: DataType): (SpecializedGetters, Int) => Any = 
dataType match {
-case BooleanType => (input, ordinal) => input.getBoolean(ordinal)
-case ByteType => (input, ordinal) => input.getByte(ordinal)
-case ShortType => (input, ordinal) => input.getShort(ordinal)
-case IntegerType | DateType => (input, ordinal) => 
input.getInt(ordinal)
-case LongType | TimestampType => (input, ordinal) => 
input.getLong(ordinal)
-case FloatType => (input, ordinal) => input.getFloat(ordinal)
-case DoubleType => (input, ordinal) => input.getDouble(ordinal)
-case StringType => (input, ordinal) => input.getUTF8String(ordinal)
-case BinaryType => (input, ordinal) => input.getBinary(ordinal)
-case CalendarIntervalType => (input, ordinal) => 
input.getInterval(ordinal)
-case t: DecimalType => (input, ordinal) => input.getDecimal(ordinal, 
t.precision, t.scale)
-case t: StructType => (input, ordinal) => input.getStruct(ordinal, 
t.size)
-case _: ArrayType => (input, ordinal) => input.getArray(ordinal)
-case _: MapType => (input, ordinal) => input.getMap(ordinal)
-case u: UserDefinedType[_] => getAccessor(u.sqlType)
-case _ => (input, ordinal) => input.get(ordinal, dataType)
+  def getAccessor(dt: DataType, nullable: Boolean = true): 
(SpecializedGetters, Int) => Any = {
--- End diff --

I can move it to a new PR if others think it's necessary. It's a little 
dangerous to ask the caller side to take care of null values.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #23124: [SPARK-25829][SQL] remove duplicated map keys wit...

2018-11-22 Thread cloud-fan
GitHub user cloud-fan opened a pull request:

https://github.com/apache/spark/pull/23124

[SPARK-25829][SQL] remove duplicated map keys with last wins policy

## What changes were proposed in this pull request?

Currently duplicated map keys are not handled consistently. For example, 
map look up respects the duplicated key appears first, `Dataset.collect` only 
keeps the duplicated key appears last, `MapKeys` returns duplicated keys, etc.

This PR proposes to remove duplicated map keys with last wins policy, to 
follow Java/Scala and Presto. It only applies to built-in functions, as users 
can create map with duplicated map keys via private APIs anyway.

For other places:
1. data source v1 doesn't have this problem, as users need to provide a 
java/scala map, which can't have duplicated keys.
2. data source v2 may have this problem. I've added a note to 
`ArrayBasedMapData` to ask the caller to take care of duplicated keys. In the 
future we should enforce it in the stable data APIs for data source v2.
3. UDF doesn't have this problem, as users need to provide a java/scala 
map. Same as data source v1.
4. file format. I checked all of them and only parquet does not enforce it. 
For backward compatibility reasons I change nothing but leave a note saying 
that the behavior will be undefined if users write map with duplicated keys to 
parquet files. Maybe we can add a config and fail by default if parquet files 
have map with duplicated keys. This can be done in followup.

## How was this patch tested?

updated tests and new tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/cloud-fan/spark map

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/23124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #23124


commit cbcd5d7a937f8120ef8527f1f26150ed93f1de0a
Author: Wenchen Fan 
Date:   2018-11-15T02:49:22Z

remove duplicated map keys with last wins policy




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org