[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21968 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r215905122 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -48,6 +48,12 @@ class RowBasedHashMapGenerator( val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) +val numVarLenFields = groupingKeys.map(_.dataType).count { --- End diff -- I think that we can simplify this by removing `case ...`, as @cloud-fan suggested. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214691997 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -48,6 +48,8 @@ class RowBasedHashMapGenerator( val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) +val numVarLenFields = groupingKeys.map(_.dataType).count(dt => !UnsafeRow.isFixedLength(dt)) --- End diff -- the code style doesn't matter, both are fine. but let's keep the comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user heary-cao commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214502781 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -48,6 +48,8 @@ class RowBasedHashMapGenerator( val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) +val numVarLenFields = groupingKeys.map(_.dataType).count(dt => !UnsafeRow.isFixedLength(dt)) --- End diff -- @cloud-fan We want to discuss, how to modify? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user heary-cao commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214502585 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -48,6 +48,8 @@ class RowBasedHashMapGenerator( val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) +val numVarLenFields = groupingKeys.map(_.dataType).count(dt => !UnsafeRow.isFixedLength(dt)) --- End diff -- @cloud-fan We want to discuss, how to modify? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214492145 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -48,6 +48,8 @@ class RowBasedHashMapGenerator( val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) +val numVarLenFields = groupingKeys.map(_.dataType).count(dt => !UnsafeRow.isFixedLength(dt)) --- End diff -- plz keep the comment `// TODO: consider large decimal and interval type` below --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214491914 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -48,6 +48,8 @@ class RowBasedHashMapGenerator( val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) +val numVarLenFields = groupingKeys.map(_.dataType).count(dt => !UnsafeRow.isFixedLength(dt)) --- End diff -- super nit: `.count(!UnsafeRow.isFixedLength(_))`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user heary-cao commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214294682 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -130,6 +134,12 @@ class RowBasedHashMapGenerator( } }.mkString(";\n") +val nullByteWriter = if (groupingKeySchema.map(_.nullable).forall(_ == false)) { --- End diff -- ok, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user heary-cao commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214294339 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -48,6 +48,12 @@ class RowBasedHashMapGenerator( val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) +val numVarLenFields = groupingKeys.map(_.dataType).count { --- End diff -- ok, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user heary-cao commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214293612 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -141,11 +151,8 @@ class RowBasedHashMapGenerator( |if (buckets[idx] == -1) { | if (numRows < capacity && !isBatchFull) { |// creating the unsafe for new entry - | org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter - | = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter( - | ${groupingKeySchema.length}, ${numVarLenFields * 32}); |agg_rowWriter.reset(); //TODO: investigate if reset or zeroout are actually needed --- End diff -- ok, --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user heary-cao commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214293534 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -141,11 +151,8 @@ class RowBasedHashMapGenerator( |if (buckets[idx] == -1) { | if (numRows < capacity && !isBatchFull) { |// creating the unsafe for new entry --- End diff -- ok, updated. thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214246268 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -130,6 +134,12 @@ class RowBasedHashMapGenerator( } }.mkString(";\n") +val nullByteWriter = if (groupingKeySchema.map(_.nullable).forall(_ == false)) { --- End diff -- maybe name it `resetNullBits`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214246211 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -48,6 +48,12 @@ class RowBasedHashMapGenerator( val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) +val numVarLenFields = groupingKeys.map(_.dataType).count { --- End diff -- groupingKeys.map(_.dataType).count(dt => !UnsafeRow.isFixedLength(dt)) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214235758 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -141,11 +151,8 @@ class RowBasedHashMapGenerator( |if (buckets[idx] == -1) { | if (numRows < capacity && !isBatchFull) { |// creating the unsafe for new entry - | org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter - | = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter( - | ${groupingKeySchema.length}, ${numVarLenFields * 32}); |agg_rowWriter.reset(); //TODO: investigate if reset or zeroout are actually needed --- End diff -- I think now reset and zero out is needed? So maybe remove this TODO? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r214235660 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -141,11 +151,8 @@ class RowBasedHashMapGenerator( |if (buckets[idx] == -1) { | if (numRows < capacity && !isBatchFull) { |// creating the unsafe for new entry --- End diff -- Remove or update this comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r212959634 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -141,9 +141,6 @@ class RowBasedHashMapGenerator( |if (buckets[idx] == -1) { | if (numRows < capacity && !isBatchFull) { |// creating the unsafe for new entry - | org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter agg_rowWriter - | = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter( - | ${groupingKeySchema.length}, ${numVarLenFields * 32}); |agg_rowWriter.reset(); //TODO: investigate if reset or zeroout are actually needed |agg_rowWriter.zeroOutNullBytes(); --- End diff -- btw, if `groupingKeySchema` has no nullable field, can we drop `agg_rowWriter.zeroOutNullBytes()`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r212956571 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -48,6 +48,8 @@ class RowBasedHashMapGenerator( val keySchema = ctx.addReferenceObj("keySchemaTerm", groupingKeySchema) val valueSchema = ctx.addReferenceObj("valueSchemaTerm", bufferSchema) +val numVarLenFields = groupingKeys.map(_.dataType).count(!UnsafeRow.isFixedLength(_)) --- End diff -- Do not remove the `TODO` comment below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r212909243 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -164,9 +164,8 @@ private[joins] class UnsafeHashedRelation( def getValue(key: InternalRow): InternalRow = { val unsafeKey = key.asInstanceOf[UnsafeRow] val map = binaryMap // avoid the compiler error -val loc = new map.Location // this could be allocated in stack -binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, - unsafeKey.getSizeInBytes, loc, unsafeKey.hashCode()) +val loc = map.lookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, --- End diff -- Before this PR, `loc` is allocated at each call of `getValue()`. After this PR, `loc` will be shared within each `binaryMap` that is passed to a constructor of `UnsafeHashedRelation`. Is this behavior change safe? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user heary-cao commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r212898251 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -44,6 +44,12 @@ class RowBasedHashMapGenerator( groupingKeySchema, bufferSchema) { override protected def initializeAggregateHashMap(): String = { +val numVarLenFields = groupingKeys.map(_.dataType).count { --- End diff -- It can be used, but it has nothing to do with this PR. Can we improve it in the next PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user heary-cao commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r212896640 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -164,9 +164,8 @@ private[joins] class UnsafeHashedRelation( def getValue(key: InternalRow): InternalRow = { val unsafeKey = key.asInstanceOf[UnsafeRow] val map = binaryMap // avoid the compiler error -val loc = new map.Location // this could be allocated in stack -binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, - unsafeKey.getSizeInBytes, loc, unsafeKey.hashCode()) +val loc = map.lookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, --- End diff -- this is safe to lookup, and It is different from the get(key: InternalRow). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r212799711 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/RowBasedHashMapGenerator.scala --- @@ -44,6 +44,12 @@ class RowBasedHashMapGenerator( groupingKeySchema, bufferSchema) { override protected def initializeAggregateHashMap(): String = { +val numVarLenFields = groupingKeys.map(_.dataType).count { --- End diff -- Nit: can't this just be `.count(!UnsafeRow.isFixedLength(_))`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21968#discussion_r212569048 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala --- @@ -164,9 +164,8 @@ private[joins] class UnsafeHashedRelation( def getValue(key: InternalRow): InternalRow = { val unsafeKey = key.asInstanceOf[UnsafeRow] val map = binaryMap // avoid the compiler error -val loc = new map.Location // this could be allocated in stack -binaryMap.safeLookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, - unsafeKey.getSizeInBytes, loc, unsafeKey.hashCode()) +val loc = map.lookup(unsafeKey.getBaseObject, unsafeKey.getBaseOffset, --- End diff -- IIUC, this change makes this part thread-unsafe. Is it OK? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21968: [SPARK-24999][SQL]Reduce unnecessary 'new' memory...
GitHub user heary-cao opened a pull request: https://github.com/apache/spark/pull/21968 [SPARK-24999][SQL]Reduce unnecessary 'new' memory operations ## What changes were proposed in this pull request? This PR is to solve the CodeGen code generated by fast hash, and there is no need to apply for a block of memory for every new entry, because unsafeRow's memory can be reused. ## How was this patch tested? the existed test cases. You can merge this pull request into a Git repository by running: $ git pull https://github.com/heary-cao/spark updateNewMemory Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21968.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21968 commit c31597272c10c695d74a7a497ee76fd310026cfd Author: caoxuewen Date: 2018-08-02T11:50:44Z Reduce unnecessary 'new' memory operations --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org