spark git commit: [SPARK-17490][SQL] Optimize SerializeFromObject() for a primitive array
Repository: spark Updated Branches: refs/heads/branch-2.1 d1eac3ef4 -> 9873d57f2 [SPARK-17490][SQL] Optimize SerializeFromObject() for a primitive array Waiting for merging #13680 This PR optimizes `SerializeFromObject()` for an primitive array. This is derived from #13758 to address one of problems by using a simple way in #13758. The current implementation always generates `GenericArrayData` from `SerializeFromObject()` for any type of an array in a logical plan. This involves a boxing at a constructor of `GenericArrayData` when `SerializedFromObject()` has an primitive array. This PR enables to generate `UnsafeArrayData` from `SerializeFromObject()` for a primitive array. It can avoid boxing to create an instance of `ArrayData` in the generated code by Catalyst. This PR also generate `UnsafeArrayData` in a case for `RowEncoder.serializeFor` or `CatalystTypeConverters.createToCatalystConverter`. Performance improvement of `SerializeFromObject()` is up to 2.0x ``` OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64 Intel Xeon E3-12xx v2 (Ivy Bridge) Without this PR Write an array in Dataset: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative Int556 / 608 15.1 66.3 1.0X Double1668 / 1746 5.0 198.8 0.3X with this PR Write an array in Dataset: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative Int352 / 401 23.8 42.0 1.0X Double 821 / 885 10.2 97.9 0.4X ``` Here is an example program that will happen in mllib as described in [SPARK-16070](https://issues.apache.org/jira/browse/SPARK-16070). ``` sparkContext.parallelize(Seq(Array(1, 2)), 1).toDS.map(e => e).show ``` Generated code before applying this PR ``` java /* 039 */ protected void processNext() throws java.io.IOException { /* 040 */ while (inputadapter_input.hasNext()) { /* 041 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 042 */ int[] inputadapter_value = (int[])inputadapter_row.get(0, null); /* 043 */ /* 044 */ Object mapelements_obj = ((Expression) references[0]).eval(null); /* 045 */ scala.Function1 mapelements_value1 = (scala.Function1) mapelements_obj; /* 046 */ /* 047 */ boolean mapelements_isNull = false || false; /* 048 */ int[] mapelements_value = null; /* 049 */ if (!mapelements_isNull) { /* 050 */ Object mapelements_funcResult = null; /* 051 */ mapelements_funcResult = mapelements_value1.apply(inputadapter_value); /* 052 */ if (mapelements_funcResult == null) { /* 053 */ mapelements_isNull = true; /* 054 */ } else { /* 055 */ mapelements_value = (int[]) mapelements_funcResult; /* 056 */ } /* 057 */ /* 058 */ } /* 059 */ mapelements_isNull = mapelements_value == null; /* 060 */ /* 061 */ serializefromobject_argIsNulls[0] = mapelements_isNull; /* 062 */ serializefromobject_argValue = mapelements_value; /* 063 */ /* 064 */ boolean serializefromobject_isNull = false; /* 065 */ for (int idx = 0; idx < 1; idx++) { /* 066 */ if (serializefromobject_argIsNulls[idx]) { serializefromobject_isNull = true; break; } /* 067 */ } /* 068 */ /* 069 */ final ArrayData serializefromobject_value = serializefromobject_isNull ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue); /* 070 */ serializefromobject_holder.reset(); /* 071 */ /* 072 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 073 */ /* 074 */ if (serializefromobject_isNull) { /* 075 */ serializefromobject_rowWriter.setNullAt(0); /* 076 */ } else { /* 077 */ // Remember the current cursor so that we can calculate how many bytes are /* 078 */ // written later. /* 079 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 080 */ /* 081 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 082 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 083 */ // grow the global buffer before writing data. /* 084 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 085 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 086 */ serializefromobject_holder.cursor += serializefromobject_sizeInBytes; /* 087 */ /* 088
spark git commit: [SPARK-17490][SQL] Optimize SerializeFromObject() for a primitive array
Repository: spark Updated Branches: refs/heads/master 8f0ea011a -> 19cf20806 [SPARK-17490][SQL] Optimize SerializeFromObject() for a primitive array ## What changes were proposed in this pull request? Waiting for merging #13680 This PR optimizes `SerializeFromObject()` for an primitive array. This is derived from #13758 to address one of problems by using a simple way in #13758. The current implementation always generates `GenericArrayData` from `SerializeFromObject()` for any type of an array in a logical plan. This involves a boxing at a constructor of `GenericArrayData` when `SerializedFromObject()` has an primitive array. This PR enables to generate `UnsafeArrayData` from `SerializeFromObject()` for a primitive array. It can avoid boxing to create an instance of `ArrayData` in the generated code by Catalyst. This PR also generate `UnsafeArrayData` in a case for `RowEncoder.serializeFor` or `CatalystTypeConverters.createToCatalystConverter`. Performance improvement of `SerializeFromObject()` is up to 2.0x ``` OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 4.4.11-200.fc22.x86_64 Intel Xeon E3-12xx v2 (Ivy Bridge) Without this PR Write an array in Dataset: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative Int556 / 608 15.1 66.3 1.0X Double1668 / 1746 5.0 198.8 0.3X with this PR Write an array in Dataset: Best/Avg Time(ms)Rate(M/s) Per Row(ns) Relative Int352 / 401 23.8 42.0 1.0X Double 821 / 885 10.2 97.9 0.4X ``` Here is an example program that will happen in mllib as described in [SPARK-16070](https://issues.apache.org/jira/browse/SPARK-16070). ``` sparkContext.parallelize(Seq(Array(1, 2)), 1).toDS.map(e => e).show ``` Generated code before applying this PR ``` java /* 039 */ protected void processNext() throws java.io.IOException { /* 040 */ while (inputadapter_input.hasNext()) { /* 041 */ InternalRow inputadapter_row = (InternalRow) inputadapter_input.next(); /* 042 */ int[] inputadapter_value = (int[])inputadapter_row.get(0, null); /* 043 */ /* 044 */ Object mapelements_obj = ((Expression) references[0]).eval(null); /* 045 */ scala.Function1 mapelements_value1 = (scala.Function1) mapelements_obj; /* 046 */ /* 047 */ boolean mapelements_isNull = false || false; /* 048 */ int[] mapelements_value = null; /* 049 */ if (!mapelements_isNull) { /* 050 */ Object mapelements_funcResult = null; /* 051 */ mapelements_funcResult = mapelements_value1.apply(inputadapter_value); /* 052 */ if (mapelements_funcResult == null) { /* 053 */ mapelements_isNull = true; /* 054 */ } else { /* 055 */ mapelements_value = (int[]) mapelements_funcResult; /* 056 */ } /* 057 */ /* 058 */ } /* 059 */ mapelements_isNull = mapelements_value == null; /* 060 */ /* 061 */ serializefromobject_argIsNulls[0] = mapelements_isNull; /* 062 */ serializefromobject_argValue = mapelements_value; /* 063 */ /* 064 */ boolean serializefromobject_isNull = false; /* 065 */ for (int idx = 0; idx < 1; idx++) { /* 066 */ if (serializefromobject_argIsNulls[idx]) { serializefromobject_isNull = true; break; } /* 067 */ } /* 068 */ /* 069 */ final ArrayData serializefromobject_value = serializefromobject_isNull ? null : new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_argValue); /* 070 */ serializefromobject_holder.reset(); /* 071 */ /* 072 */ serializefromobject_rowWriter.zeroOutNullBytes(); /* 073 */ /* 074 */ if (serializefromobject_isNull) { /* 075 */ serializefromobject_rowWriter.setNullAt(0); /* 076 */ } else { /* 077 */ // Remember the current cursor so that we can calculate how many bytes are /* 078 */ // written later. /* 079 */ final int serializefromobject_tmpCursor = serializefromobject_holder.cursor; /* 080 */ /* 081 */ if (serializefromobject_value instanceof UnsafeArrayData) { /* 082 */ final int serializefromobject_sizeInBytes = ((UnsafeArrayData) serializefromobject_value).getSizeInBytes(); /* 083 */ // grow the global buffer before writing data. /* 084 */ serializefromobject_holder.grow(serializefromobject_sizeInBytes); /* 085 */ ((UnsafeArrayData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor); /* 086 */ serializefromobject_holder.cursor += s