This is an automated email from the ASF dual-hosted git repository.

imaxon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d6cf75  [ASTERIXDB-2895][RT] Vsize buffers in PyUDF IPC
8d6cf75 is described below

commit 8d6cf7574430de4f4049fbf5e6726cf2ede5d8aa
Author: Ian Maxon <ian@maxons.email>
AuthorDate: Tue Oct 26 00:15:48 2021 -0700

    [ASTERIXDB-2895][RT] Vsize buffers in PyUDF IPC
    
    - user mode changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    
    - Convert most uses of ByteBuffer to ArrayBackedValueStorage
      so that the size of the buffer can grow arbitrarily with
      the data
    - Convert ADM-to-Msgpack serialiation to use IVisitablePointable
    - Convert all serialization interfaces that used ByteBuffer
      to use DataOutput instead
    - Fix UTF8 encoding bugs by using StandardToModifiedUTF8DataOutput
    - Adapt some of the UTF8 printing code to be used for
      UTF8 output to msgpack
    - Fix CSV output printer to not ignore surrogate pairs
    - Fix ASTERIXDB-29773 (returned records from PyUDF aren't sorted)
    
    Change-Id: Ic95e592b42139b4750af8bb20291f926b3c973e2
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12643
    Reviewed-by: Wael Alkowaileet <wael....@gmail.com>
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Contrib: Ian Maxon <ima...@uci.edu>
---
 .../big_object_roundtrip_20M.1.ddl.sqlpp}          |  15 +-
 .../big_object_roundtrip_20M.2.update.sqlpp}       |  11 +-
 .../big_object_roundtrip_20M.3.lib.sqlpp}          |   8 +-
 .../big_object_roundtrip_20M.4.ddl.sqlpp}          |   9 +-
 .../big_object_roundtrip_20M.5.query.sqlpp}        |   9 +-
 .../py_nested_access.4.query.sqlpp                 |   3 +-
 .../type_validation.2.ddl.sqlpp                    |   3 +
 .../big_object_roundtrip_20M.1.adm.template        |   1 +
 .../mysentiment_twitter/mysentiment_twitter.13.adm |  42 +--
 .../resources/runtimets/testsuite_it_python.xml    |   5 +
 .../asterix/common/exceptions/ErrorCode.java       |   1 +
 .../src/main/resources/asx_errormsg/en.properties  |   1 +
 .../builders/ListLikeNumericArrayFactory.java}     |  25 +-
 .../StandardToModifiedUTF8DataOutputFactory.java   |  37 +++
 .../asterix/external/ipc/PythonIPCProto.java       | 163 ++++++++--
 .../asterix/external/ipc/PythonMessageBuilder.java |  63 ++--
 .../ExternalScalarPythonFunctionEvaluator.java     |  25 +-
 .../external/library/JavaFunctionHelper.java       |   8 +-
 .../library/{TypeInfo.java => JavaTypeInfo.java}   |   4 +-
 .../asterix/external/library/PyTypeInfo.java}      |  43 ++-
 .../external/library/PythonLibraryEvaluator.java   |  36 +-
 .../external/library/java/JObjectAccessors.java    |  10 +-
 .../library/java/JObjectPointableVisitor.java      |  13 +-
 .../library/msgpack/IMsgPackAccessor.java}         |  11 +-
 .../library/msgpack/IMsgPackListAccessor.java}     |  13 +-
 .../library/msgpack/IMsgPackRecordAccessor.java}   |  13 +-
 .../external/library/msgpack/MessagePackUtils.java |  73 +++++
 .../library/msgpack/MessagePackerFromADM.java      | 362 ---------------------
 .../library/msgpack/MessageUnpackerToADM.java      | 191 ++++++-----
 .../external/library/msgpack/MsgPackAccessors.java | 275 ++++++++++++++++
 .../library/msgpack/MsgPackPointableVisitor.java   |  77 +++++
 .../ExternalAssignBatchRuntimeFactory.java         |  77 ++---
 .../data/nontagged/printers/PrintTools.java        |  45 ++-
 33 files changed, 980 insertions(+), 692 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.ddl.sqlpp
similarity index 75%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.ddl.sqlpp
index 8041b5e..d3bf31c 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.ddl.sqlpp
@@ -16,10 +16,15 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use externallibtest;
 
-create function typeValidation(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib;
+drop dataverse testdv2 if exists;
+create dataverse testdv2;
+use testdv2;
 
-create function typeValidationNullCall(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib with {"null-call": true};
+create type testtype as closed {
+  id: int64,
+  name: string,
+  hobbies: {{string}}
+};
+
+create dataset testds(testtype) primary key id;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.2.update.sqlpp
similarity index 75%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.2.update.sqlpp
index 8041b5e..0353284 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.2.update.sqlpp
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use externallibtest;
 
-create function typeValidation(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib;
+use testdv2;
 
-create function typeValidationNullCall(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib with {"null-call": true};
+set `compiler.sortmemory` "64MB";
+
+load dataset testds
+using localfs
+(("path"="asterix_nc1://target/data/big-object/big_object_20M.adm"),("format"="adm"));
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.3.lib.sqlpp
similarity index 75%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.3.lib.sqlpp
index 8041b5e..923dcb2 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.3.lib.sqlpp
@@ -16,10 +16,4 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use externallibtest;
-
-create function typeValidation(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib;
-
-create function typeValidationNullCall(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib with {"null-call": true};
+install testdv2 testlib python admin admin target/TweetSent.pyz
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.4.ddl.sqlpp
similarity index 75%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.4.ddl.sqlpp
index 8041b5e..8b304fc 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.4.ddl.sqlpp
@@ -16,10 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use externallibtest;
+ USE testdv2;
 
-create function typeValidation(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib;
-
-create function typeValidationNullCall(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib with {"null-call": true};
+create function roundtrip(s) as "roundtrip",
+  "Tests.roundstr" at testlib;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.5.query.sqlpp
similarity index 75%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
copy to 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.5.query.sqlpp
index 8041b5e..fdb285e 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.5.query.sqlpp
@@ -16,10 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use externallibtest;
 
-create function typeValidation(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib;
+use testdv2;
 
-create function typeValidationNullCall(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib with {"null-call": true};
+select value roundtrip(d)
+from testds d
+where d.id=1;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
index e17f03f..72dc83c 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
@@ -28,4 +28,5 @@ use test;
 select element result
 from  Animals as test
 with  result as 
roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower.lower.lower.Species
-order by result;
+order by result
+;
\ No newline at end of file
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
index 8041b5e..ff61810 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
@@ -23,3 +23,6 @@ create function typeValidation(a, b, c, d, e, f, g, h)
 
 create function typeValidationNullCall(a, b, c, d, e, f, g, h)
   as "roundtrip", "Tests.roundtrip" at testlib with {"null-call": true};
+
+create function stringTest(s) as "roundtrip",
+  "Tests.roundtrip" at testlib;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.adm.template
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.adm.template
new file mode 100644
index 0000000..a0cc7aa
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/big_object_roundtrip_20M/big_object_roundtrip_20M.1.adm.template
@@ -0,0 +1 @@
+{ "id": 1, "name": "Person One", "hobbies": [ "%lorembytes:20971520%" ] }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm
index 65a7c81..c960344 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/external-library/mysentiment_twitter/mysentiment_twitter.13.adm
@@ -1,9 +1,9 @@
 { "id": 670301227662491648, "len": 20, "sent": 1 }
 { "id": 670301227553566720, "len": 139, "sent": 0 }
 { "id": 670301227041857536, "len": 112, "sent": 0 }
-{ "id": 670301227037519876, "len": 33, "sent": 0 }
+{ "id": 670301227037519876, "len": 34, "sent": 0 }
 { "id": 670301226987159552, "len": 57, "sent": 0 }
-{ "id": 670301226513391616, "len": 28, "sent": 1 }
+{ "id": 670301226513391616, "len": 29, "sent": 1 }
 { "id": 670301226337202180, "len": 77, "sent": 1 }
 { "id": 670301226190278656, "len": 25, "sent": 0 }
 { "id": 670301225959579648, "len": 112, "sent": 1 }
@@ -15,25 +15,25 @@
 { "id": 670301225162661889, "len": 28, "sent": 1 }
 { "id": 670301224885837824, "len": 63, "sent": 0 }
 { "id": 670301224814698496, "len": 59, "sent": 0 }
-{ "id": 670301224709849090, "len": 33, "sent": 1 }
+{ "id": 670301224709849090, "len": 37, "sent": 1 }
 { "id": 670301224684556288, "len": 21, "sent": 0 }
 { "id": 670301224680480768, "len": 39, "sent": 0 }
 { "id": 670301224348946433, "len": 64, "sent": 1 }
 { "id": 670301224261058560, "len": 61, "sent": 1 }
-{ "id": 670301224231690240, "len": 33, "sent": 0 }
-{ "id": 670301224214794240, "len": 33, "sent": 0 }
+{ "id": 670301224231690240, "len": 34, "sent": 0 }
+{ "id": 670301224214794240, "len": 41, "sent": 0 }
 { "id": 670301223753351168, "len": 105, "sent": 1 }
 { "id": 670301223426367488, "len": 23, "sent": 0 }
-{ "id": 670301223216545792, "len": 31, "sent": 0 }
+{ "id": 670301223216545792, "len": 34, "sent": 0 }
 { "id": 670301223182974976, "len": 34, "sent": 1 }
 { "id": 670301223128535041, "len": 21, "sent": 0 }
 { "id": 670301222759301121, "len": 132, "sent": 0 }
 { "id": 670301222734307329, "len": 110, "sent": 1 }
 { "id": 670301222717419520, "len": 81, "sent": 0 }
 { "id": 670301222318936064, "len": 110, "sent": 1 }
-{ "id": 670301222302150657, "len": 131, "sent": 0 }
+{ "id": 670301222302150657, "len": 135, "sent": 0 }
 { "id": 670301222222602240, "len": 43, "sent": 1 }
-{ "id": 670301222113517568, "len": 27, "sent": 0 }
+{ "id": 670301222113517568, "len": 29, "sent": 0 }
 { "id": 670301221836615680, "len": 44, "sent": 1 }
 { "id": 670301221719310336, "len": 28, "sent": 0 }
 { "id": 670301221442486272, "len": 34, "sent": 0 }
@@ -44,13 +44,13 @@
 { "id": 670301220305821696, "len": 140, "sent": 0 }
 { "id": 670301220247072770, "len": 83, "sent": 1 }
 { "id": 670301220196626432, "len": 36, "sent": 0 }
-{ "id": 670301220079312901, "len": 31, "sent": 1 }
-{ "id": 670301219949305857, "len": 70, "sent": 1 }
+{ "id": 670301220079312901, "len": 32, "sent": 1 }
+{ "id": 670301219949305857, "len": 94, "sent": 1 }
 { "id": 670301219739574273, "len": 131, "sent": 1 }
 { "id": 670301219206877184, "len": 27, "sent": 0 }
 { "id": 670301219139620864, "len": 124, "sent": 0 }
-{ "id": 670301218737123328, "len": 124, "sent": 0 }
-{ "id": 670301218640531458, "len": 31, "sent": 1 }
+{ "id": 670301218737123328, "len": 126, "sent": 0 }
+{ "id": 670301218640531458, "len": 33, "sent": 1 }
 { "id": 670301218598756352, "len": 47, "sent": 0 }
 { "id": 670301218565156865, "len": 44, "sent": 0 }
 { "id": 670301218414206976, "len": 71, "sent": 1 }
@@ -58,14 +58,14 @@
 { "id": 670301218078629888, "len": 9, "sent": 0 }
 { "id": 670301217851990017, "len": 111, "sent": 0 }
 { "id": 670301217793269760, "len": 113, "sent": 0 }
-{ "id": 670301217508036608, "len": 47, "sent": 0 }
+{ "id": 670301217508036608, "len": 55, "sent": 0 }
 { "id": 670301217369657344, "len": 137, "sent": 0 }
-{ "id": 670301217311088641, "len": 28, "sent": 0 }
+{ "id": 670301217311088641, "len": 29, "sent": 0 }
 { "id": 670301217231347712, "len": 123, "sent": 0 }
 { "id": 670301216891473920, "len": 44, "sent": 0 }
 { "id": 670301216874721280, "len": 68, "sent": 0 }
 { "id": 670301216799232000, "len": 50, "sent": 1 }
-{ "id": 670301216669171713, "len": 54, "sent": 0 }
+{ "id": 670301216669171713, "len": 55, "sent": 0 }
 { "id": 670301216493060097, "len": 113, "sent": 1 }
 { "id": 670301216400924676, "len": 35, "sent": 1 }
 { "id": 670301216371552258, "len": 58, "sent": 0 }
@@ -78,21 +78,21 @@
 { "id": 670301214958055424, "len": 58, "sent": 1 }
 { "id": 670301214605733888, "len": 139, "sent": 1 }
 { "id": 670301214509129728, "len": 114, "sent": 1 }
-{ "id": 670301214442041344, "len": 18, "sent": 1 }
+{ "id": 670301214442041344, "len": 19, "sent": 1 }
 { "id": 670301214295392256, "len": 47, "sent": 0 }
-{ "id": 670301213737529344, "len": 9, "sent": 0 }
+{ "id": 670301213737529344, "len": 10, "sent": 0 }
 { "id": 670301213544595457, "len": 63, "sent": 1 }
 { "id": 670301213515235333, "len": 107, "sent": 0 }
 { "id": 670301213464899584, "len": 105, "sent": 1 }
 { "id": 670301213120942080, "len": 39, "sent": 0 }
 { "id": 670301212961603585, "len": 63, "sent": 0 }
-{ "id": 670301212961603584, "len": 20, "sent": 0 }
-{ "id": 670301212856737792, "len": 51, "sent": 0 }
+{ "id": 670301212961603584, "len": 25, "sent": 0 }
+{ "id": 670301212856737792, "len": 55, "sent": 0 }
 { "id": 670301212760117248, "len": 133, "sent": 1 }
 { "id": 670301211808010240, "len": 103, "sent": 0 }
-{ "id": 670301211774468096, "len": 40, "sent": 0 }
+{ "id": 670301211774468096, "len": 41, "sent": 0 }
 { "id": 670301211703144450, "len": 138, "sent": 1 }
-{ "id": 670301211581685761, "len": 25, "sent": 1 }
+{ "id": 670301211581685761, "len": 26, "sent": 1 }
 { "id": 670301211560685568, "len": 12, "sent": 1 }
 { "id": 670301211090751490, "len": 140, "sent": 0 }
 { "id": 670301210654699520, "len": 13, "sent": 0 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
index 5fc7316..686ede2 100644
--- a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_it_python.xml
@@ -85,5 +85,10 @@ ArithmeticError: oof
         <output-dir compare="Text">big_object_pyudf</output-dir>
       </compilation-unit>
     </test-case>
+    <test-case FilePath="external-library">
+      <compilation-unit name="big_object_roundtrip_20M">
+        <output-dir compare="Text">big_object_roundtrip_20M</output-dir>
+      </compilation-unit>
+    </test-case>
   </test-group>
 </test-suite>
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index d49dffc..84e5c22 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -85,6 +85,7 @@ public enum ErrorCode implements IError {
 
     EXTERNAL_UDF_RESULT_TYPE_ERROR(200),
     EXTERNAL_UDF_EXCEPTION(201),
+    EXTERNAL_UDF_PROTO_RETURN_EXCEPTION(202),
 
     // Compilation errors
     PARSE_ERROR(1001),
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 136e169..be4f512 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -92,6 +92,7 @@
 
 200 = External UDF cannot produce expected result. Please check the UDF 
configuration
 201 = External UDF returned exception. Returned exception was: %1$s
+202 = External UDF protocol encountered an unexpected return result.
 
 # Compile-time check errors
 1001 = Syntax error: %1$s
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/ListLikeNumericArrayFactory.java
similarity index 61%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
copy to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/ListLikeNumericArrayFactory.java
index e17f03f..e53f157 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/ListLikeNumericArrayFactory.java
@@ -16,16 +16,21 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
-* Description  : Access a records nested records at each level.
-* Expected Res : Success
-* Date         : 04 Jun 2015
-*/
+package org.apache.asterix.external.input.stream.builders;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.om.util.container.IObjectFactory;
 
-use test;
+public class ListLikeNumericArrayFactory<T extends Number> implements 
IObjectFactory<List<T>, T> {
 
+    @Override
+    public List<T> create(T arg) {
+        List<T> list = new ArrayList<>(arg.intValue());
+        list.addAll(Collections.nCopies(arg.intValue(), arg));
+        return list;
+    }
 
-select element result
-from  Animals as test
-with  result as 
roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower.lower.lower.Species
-order by result;
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/StandardToModifiedUTF8DataOutputFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/StandardToModifiedUTF8DataOutputFactory.java
new file mode 100644
index 0000000..606aee2
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/stream/builders/StandardToModifiedUTF8DataOutputFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.stream.builders;
+
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AStringSerializerDeserializer;
+import 
org.apache.asterix.external.input.stream.StandardUTF8ToModifiedUTF8DataOutput;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.util.container.IObjectFactory;
+import org.apache.hyracks.util.string.UTF8StringReader;
+import org.apache.hyracks.util.string.UTF8StringWriter;
+
+public class StandardToModifiedUTF8DataOutputFactory
+        implements IObjectFactory<StandardUTF8ToModifiedUTF8DataOutput, 
ATypeTag> {
+
+    @Override
+    public StandardUTF8ToModifiedUTF8DataOutput create(ATypeTag type) {
+        return new StandardUTF8ToModifiedUTF8DataOutput(
+                new AStringSerializerDeserializer(new UTF8StringWriter(), new 
UTF8StringReader()));
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
index cd7ec18..c803517 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonIPCProto.java
@@ -18,14 +18,28 @@ package org.apache.asterix.external.ipc;
 
 import static org.apache.hyracks.ipc.impl.Message.HEADER_SIZE;
 
+import java.io.DataOutput;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor;
+import org.apache.asterix.om.pointables.AFlatValuePointable;
+import org.apache.asterix.om.pointables.AListVisitablePointable;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.PointableAllocator;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.ipc.impl.Message;
 import org.msgpack.core.MessagePack;
 import org.msgpack.core.MessageUnpacker;
@@ -33,26 +47,32 @@ import org.msgpack.core.buffer.ArrayBufferInput;
 
 public class PythonIPCProto {
 
-    private PythonMessageBuilder messageBuilder;
-    private OutputStream sockOut;
-    private ByteBuffer headerBuffer = ByteBuffer.allocate(21);
+    private final PythonMessageBuilder messageBuilder;
+    private final DataOutputStream sockOut;
+    private final ByteBuffer headerBuffer = ByteBuffer.allocate(21);
     private ByteBuffer recvBuffer = ByteBuffer.allocate(32768);
-    private ExternalFunctionResultRouter router;
+    private final ExternalFunctionResultRouter router;
     private long routeId;
     private Pair<ByteBuffer, Exception> bufferBox;
-    private Process pythonProc;
+    private final Process pythonProc;
     private long maxFunctionId;
-    private ArrayBufferInput unpackerInput;
-    private MessageUnpacker unpacker;
+    private final ArrayBufferInput unpackerInput;
+    private final MessageUnpacker unpacker;
+    private final ArrayBackedValueStorage argsStorage;
+    private final PointableAllocator pointableAllocator;
+    private final MsgPackPointableVisitor pointableVisitor;
 
     public PythonIPCProto(OutputStream sockOut, ExternalFunctionResultRouter 
router, Process pythonProc) {
-        this.sockOut = sockOut;
+        this.sockOut = new DataOutputStream(sockOut);
         messageBuilder = new PythonMessageBuilder();
         this.router = router;
         this.pythonProc = pythonProc;
-        this.maxFunctionId = 0l;
+        this.maxFunctionId = 0L;
         unpackerInput = new ArrayBufferInput(new byte[0]);
         unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
+        this.argsStorage = new ArrayBackedValueStorage();
+        this.pointableAllocator = new PointableAllocator();
+        this.pointableVisitor = new MsgPackPointableVisitor();
     }
 
     public void start() {
@@ -65,10 +85,10 @@ public class PythonIPCProto {
         recvBuffer.clear();
         recvBuffer.position(0);
         recvBuffer.limit(0);
-        messageBuilder.buf.clear();
-        messageBuilder.buf.position(0);
+        messageBuilder.reset();
         messageBuilder.hello();
-        sendMsg(routeId);
+        sendHeader(routeId, messageBuilder.getLength());
+        sendMsg();
         receiveMsg();
         if (getResponseType() != MessageType.HELO) {
             throw 
HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
@@ -81,10 +101,10 @@ public class PythonIPCProto {
         recvBuffer.clear();
         recvBuffer.position(0);
         recvBuffer.limit(0);
-        messageBuilder.buf.clear();
-        messageBuilder.buf.position(0);
+        messageBuilder.reset();
         messageBuilder.init(module, clazz, fn);
-        sendMsg(functionId);
+        sendHeader(functionId, messageBuilder.getLength());
+        sendMsg();
         receiveMsg();
         if (getResponseType() != MessageType.INIT_RSP) {
             throw 
HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
@@ -93,14 +113,21 @@ public class PythonIPCProto {
         return functionId;
     }
 
-    public ByteBuffer call(long functionId, ByteBuffer args, int numArgs) 
throws IOException, AsterixException {
+    public ByteBuffer call(long functionId, IAType[] argTypes, 
IValueReference[] argValues, boolean nullCall)
+            throws IOException, AsterixException {
         recvBuffer.clear();
         recvBuffer.position(0);
         recvBuffer.limit(0);
-        messageBuilder.buf.clear();
-        messageBuilder.buf.position(0);
-        messageBuilder.call(args.array(), args.position(), numArgs);
-        sendMsg(functionId);
+        messageBuilder.reset();
+        argsStorage.reset();
+        for (int i = 0; i < argTypes.length; i++) {
+            visitValueRef(argTypes[i], argsStorage.getDataOutput(), 
argValues[i], pointableAllocator, pointableVisitor,
+                    nullCall);
+        }
+        int len = argsStorage.getLength() + 5;
+        sendHeader(functionId, len);
+        messageBuilder.call(argValues.length, len);
+        sendMsg(argsStorage);
         receiveMsg();
         if (getResponseType() != MessageType.CALL_RSP) {
             throw 
HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
@@ -109,14 +136,16 @@ public class PythonIPCProto {
         return recvBuffer;
     }
 
-    public ByteBuffer callMulti(long key, ByteBuffer args, int numTuples) 
throws IOException, AsterixException {
+    public ByteBuffer callMulti(long key, ArrayBackedValueStorage args, int 
numTuples)
+            throws IOException, AsterixException {
         recvBuffer.clear();
         recvBuffer.position(0);
         recvBuffer.limit(0);
-        messageBuilder.buf.clear();
-        messageBuilder.buf.position(0);
-        messageBuilder.callMulti(args.array(), args.position(), numTuples);
-        sendMsg(key);
+        messageBuilder.reset();
+        int len = args.getLength() + 4;
+        sendHeader(key, len);
+        messageBuilder.callMulti(0, numTuples);
+        sendMsg(args);
         receiveMsg();
         if (getResponseType() != MessageType.CALL_RSP) {
             throw 
HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
@@ -132,7 +161,7 @@ public class PythonIPCProto {
     }
 
     public void receiveMsg() throws IOException, AsterixException {
-        Exception except = null;
+        Exception except;
         try {
             synchronized (bufferBox) {
                 while ((bufferBox.getFirst().limit() == 0 || 
bufferBox.getSecond() != null) && pythonProc.isAlive()) {
@@ -162,15 +191,27 @@ public class PythonIPCProto {
         }
     }
 
-    public void sendMsg(long key) throws IOException {
+    public void sendHeader(long key, int msgLen) throws IOException {
         headerBuffer.clear();
         headerBuffer.position(0);
-        headerBuffer.putInt(HEADER_SIZE + Integer.BYTES + 
messageBuilder.buf.position());
+        headerBuffer.putInt(HEADER_SIZE + Integer.BYTES + msgLen);
         headerBuffer.putLong(key);
         headerBuffer.putLong(routeId);
         headerBuffer.put(Message.NORMAL);
         sockOut.write(headerBuffer.array(), 0, HEADER_SIZE + Integer.BYTES);
-        sockOut.write(messageBuilder.buf.array(), 0, 
messageBuilder.buf.position());
+        sockOut.flush();
+    }
+
+    public void sendMsg(ArrayBackedValueStorage content) throws IOException {
+        sockOut.write(messageBuilder.getBuf().array(), 
messageBuilder.getBuf().arrayOffset(),
+                messageBuilder.getBuf().position());
+        sockOut.write(content.getByteArray(), content.getStartOffset(), 
content.getLength());
+        sockOut.flush();
+    }
+
+    public void sendMsg() throws IOException {
+        sockOut.write(messageBuilder.getBuf().array(), 
messageBuilder.getBuf().arrayOffset(),
+                messageBuilder.getBuf().position());
         sockOut.flush();
     }
 
@@ -182,4 +223,68 @@ public class PythonIPCProto {
         return routeId;
     }
 
+    public DataOutputStream getSockOut() {
+        return sockOut;
+    }
+
+    public static void visitValueRef(IAType type, DataOutput out, 
IValueReference valueReference,
+            PointableAllocator pointableAllocator, MsgPackPointableVisitor 
pointableVisitor, boolean visitNull)
+            throws IOException {
+        IVisitablePointable pointable;
+        switch (type.getTypeTag()) {
+            case OBJECT:
+                pointable = pointableAllocator.allocateRecordValue(type);
+                pointable.set(valueReference);
+                pointableVisitor.visit((ARecordVisitablePointable) pointable, 
pointableVisitor.getTypeInfo(type, out));
+                break;
+            case ARRAY:
+            case MULTISET:
+                pointable = pointableAllocator.allocateListValue(type);
+                pointable.set(valueReference);
+                pointableVisitor.visit((AListVisitablePointable) pointable, 
pointableVisitor.getTypeInfo(type, out));
+                break;
+            case ANY:
+                ATypeTag rtTypeTag = EnumDeserializer.ATYPETAGDESERIALIZER
+                        
.deserialize(valueReference.getByteArray()[valueReference.getStartOffset()]);
+                IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
+                switch (rtTypeTag) {
+                    case OBJECT:
+                        pointable = 
pointableAllocator.allocateRecordValue(rtType);
+                        pointable.set(valueReference);
+                        pointableVisitor.visit((ARecordVisitablePointable) 
pointable,
+                                pointableVisitor.getTypeInfo(rtType, out));
+                        break;
+                    case ARRAY:
+                    case MULTISET:
+                        pointable = 
pointableAllocator.allocateListValue(rtType);
+                        pointable.set(valueReference);
+                        pointableVisitor.visit((AListVisitablePointable) 
pointable,
+                                pointableVisitor.getTypeInfo(rtType, out));
+                        break;
+                    case MISSING:
+                    case NULL:
+                        if (!visitNull) {
+                            return;
+                        }
+                    default:
+                        pointable = 
pointableAllocator.allocateFieldValue(rtType);
+                        pointable.set(valueReference);
+                        pointableVisitor.visit((AFlatValuePointable) pointable,
+                                pointableVisitor.getTypeInfo(rtType, out));
+                        break;
+                }
+                break;
+            case MISSING:
+            case NULL:
+                if (!visitNull) {
+                    return;
+                }
+            default:
+                pointable = pointableAllocator.allocateFieldValue(type);
+                pointable.set(valueReference);
+                pointableVisitor.visit((AFlatValuePointable) pointable, 
pointableVisitor.getTypeInfo(type, out));
+                break;
+        }
+    }
+
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
index 5052eb4..5429657 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/ipc/PythonMessageBuilder.java
@@ -26,15 +26,13 @@ import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 
-import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
-import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
 public class PythonMessageBuilder {
-    private static final int MAX_BUF_SIZE = 64 * 1024 * 1024; //64MB.
     MessageType type;
     long dataLength;
-    ByteBuffer buf;
+    private final ByteBuffer buf;
 
     public PythonMessageBuilder() {
         this.type = null;
@@ -42,12 +40,25 @@ public class PythonMessageBuilder {
         this.buf = ByteBuffer.allocate(4096);
     }
 
+    public void reset() {
+        //TODO: should be able to get away w/o clearing buf?
+        buf.clear();
+    }
+
+    public ByteBuffer getBuf() {
+        return buf;
+    }
+
+    public int getLength() {
+        return buf.position() - buf.arrayOffset();
+    }
+
     public void setType(MessageType type) {
         this.type = type;
     }
 
     public void packHeader() throws HyracksDataException {
-        MessagePackerFromADM.packFixPos(buf, (byte) type.ordinal());
+        MessagePackUtils.packFixPos(buf, (byte) type.ordinal());
     }
 
     //TODO: this is wrong for any multibyte chars
@@ -75,7 +86,7 @@ public class PythonMessageBuilder {
         this.type = MessageType.QUIT;
         dataLength = getStringLength("QUIT");
         packHeader();
-        MessagePackerFromADM.packFixStr(buf, "QUIT");
+        MessagePackUtils.packFixStr(buf, "QUIT");
     }
 
     public void init(final String module, final String clazz, final String fn) 
throws HyracksDataException {
@@ -89,46 +100,27 @@ public class PythonMessageBuilder {
         }
         packHeader();
         int numArgs = clazz == null ? 2 : 3;
-        MessagePackerFromADM.packFixArrayHeader(buf, (byte) numArgs);
-        MessagePackerFromADM.packStr(buf, module);
+        MessagePackUtils.packFixArrayHeader(buf, (byte) numArgs);
+        MessagePackUtils.packStr(buf, module);
         if (clazz != null) {
-            MessagePackerFromADM.packStr(buf, clazz);
+            MessagePackUtils.packStr(buf, clazz);
         }
-        MessagePackerFromADM.packStr(buf, fn);
+        MessagePackUtils.packStr(buf, fn);
     }
 
-    public void call(byte[] args, int lim, int numArgs) throws 
HyracksDataException {
-        if (args.length > buf.capacity()) {
-            int growTo = ExternalFunctionResultRouter.closestPow2(args.length);
-            if (growTo > MAX_BUF_SIZE) {
-                throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE,
-                        "Unable to allocate message buffer larger than:" + 
MAX_BUF_SIZE + " bytes");
-            }
-            buf = ByteBuffer.allocate(growTo);
-        }
+    public void call(int numArgs, int len) throws HyracksDataException {
         buf.clear();
         buf.position(0);
         this.type = MessageType.CALL;
-        dataLength = 5 + 1 + lim;
+        dataLength = 5 + 1 + len;
         packHeader();
         //TODO: make this switch between fixarray/array16/array32
         buf.put((byte) (FIXARRAY_PREFIX + 1));
-        buf.put(ARRAY32);
-        buf.putInt(numArgs);
-        if (numArgs > 0) {
-            buf.put(args, 0, lim);
-        }
+        buf.put(ARRAY16);
+        buf.putShort((short) numArgs);
     }
 
-    public void callMulti(byte[] args, int lim, int numArgs) throws 
HyracksDataException {
-        if (args.length > buf.capacity()) {
-            int growTo = ExternalFunctionResultRouter.closestPow2(args.length);
-            if (growTo > MAX_BUF_SIZE) {
-                throw HyracksDataException.create(ErrorCode.ILLEGAL_STATE,
-                        "Unable to allocate message buffer larger than:" + 
MAX_BUF_SIZE + " bytes");
-            }
-            buf = ByteBuffer.allocate(growTo);
-        }
+    public void callMulti(int lim, int numArgs) throws HyracksDataException {
         buf.clear();
         buf.position(0);
         this.type = MessageType.CALL;
@@ -137,9 +129,6 @@ public class PythonMessageBuilder {
         //TODO: make this switch between fixarray/array16/array32
         buf.put(ARRAY16);
         buf.putShort((short) numArgs);
-        if (numArgs > 0) {
-            buf.put(args, 0, lim);
-        }
     }
 
     //this is used to send a serialized java inetaddress to the entrypoint so 
it can send it back
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
index 7c860a2..94a4dd2 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/ExternalScalarPythonFunctionEvaluator.java
@@ -61,6 +61,7 @@ class ExternalScalarPythonFunctionEvaluator extends 
ExternalScalarFunctionEvalua
 
     private MessageUnpacker unpacker;
     private ArrayBufferInput unpackerInput;
+    private MessageUnpackerToADM unpackerToADM;
 
     private long fnId;
 
@@ -87,6 +88,7 @@ class ExternalScalarPythonFunctionEvaluator extends 
ExternalScalarFunctionEvalua
         this.sourceLocation = sourceLoc;
         this.unpackerInput = new ArrayBufferInput(new byte[0]);
         this.unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
+        this.unpackerToADM = new MessageUnpackerToADM();
     }
 
     @Override
@@ -107,18 +109,13 @@ class ExternalScalarPythonFunctionEvaluator extends 
ExternalScalarFunctionEvalua
                     hasNullArg = true;
                 }
             }
-            try {
-                PythonLibraryEvaluator.setArgument(argTypes[i], argValues[i], 
argHolder, nullCall);
-            } catch (IOException e) {
-                throw new HyracksDataException("Error evaluating Python UDF", 
e);
-            }
         }
         if (!nullCall && hasNullArg) {
             PointableHelper.setNull(result);
             return;
         }
         try {
-            ByteBuffer res = libraryEvaluator.callPython(fnId, argHolder, 
argTypes.length);
+            ByteBuffer res = libraryEvaluator.callPython(fnId, argTypes, 
argValues, nullCall);
             resultBuffer.reset();
             wrap(res, resultBuffer.getDataOutput());
         } catch (Exception e) {
@@ -133,30 +130,28 @@ class ExternalScalarPythonFunctionEvaluator extends 
ExternalScalarFunctionEvalua
         outputWrapper.position(0);
         try {
             if (resultWrapper == null) {
-                outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
-                out.write(outputWrapper.array(), 0, outputWrapper.position() + 
outputWrapper.arrayOffset());
+                out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
                 return;
             }
             if ((resultWrapper.get() ^ FIXARRAY_PREFIX) != (byte) 2) {
-                throw 
HyracksDataException.create(AsterixException.create(ErrorCode.EXTERNAL_UDF_EXCEPTION,
-                        "Returned result missing outer wrapper"));
+                throw HyracksDataException
+                        
.create(AsterixException.create(ErrorCode.EXTERNAL_UDF_PROTO_RETURN_EXCEPTION));
             }
             int numresults = resultWrapper.get() ^ FIXARRAY_PREFIX;
             if (numresults > 0) {
-                MessageUnpackerToADM.unpack(resultWrapper, outputWrapper, 
true);
+                unpackerToADM.unpack(resultWrapper, out, true);
             }
             unpackerInput.reset(resultWrapper.array(), 
resultWrapper.position() + resultWrapper.arrayOffset(),
                     resultWrapper.remaining());
             unpacker.reset(unpackerInput);
-            int numEntries = unpacker.unpackArrayHeader();
-            for (int j = 0; j < numEntries; j++) {
-                outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+            int numErrors = unpacker.unpackArrayHeader();
+            for (int j = 0; j < numErrors; j++) {
+                out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
                 if (evaluatorContext.getWarningCollector().shouldWarn()) {
                     evaluatorContext.getWarningCollector().warn(
                             Warning.of(sourceLocation, 
ErrorCode.EXTERNAL_UDF_EXCEPTION, unpacker.unpackString()));
                 }
             }
-            out.write(outputWrapper.array(), 0, outputWrapper.position() + 
outputWrapper.arrayOffset());
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaFunctionHelper.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaFunctionHelper.java
index d1b2685..98755fc 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaFunctionHelper.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaFunctionHelper.java
@@ -56,7 +56,7 @@ public class JavaFunctionHelper implements IFunctionHelper {
     private final IObjectPool<IJObject, IAType> objectPool = new 
ListObjectPool<>(JTypeObjectFactory.INSTANCE);
     private final JObjectPointableVisitor pointableVisitor;
     private final PointableAllocator pointableAllocator;
-    private final Map<Integer, TypeInfo> poolTypeInfo;
+    private final Map<Integer, JavaTypeInfo> poolTypeInfo;
     private final Map<String, String> parameters;
     private final IAType[] argTypes;
 
@@ -164,10 +164,10 @@ public class JavaFunctionHelper implements 
IFunctionHelper {
         arguments[index] = jObject;
     }
 
-    private TypeInfo getTypeInfo(int index, IAType type) {
-        TypeInfo typeInfo = poolTypeInfo.get(index);
+    private JavaTypeInfo getTypeInfo(int index, IAType type) {
+        JavaTypeInfo typeInfo = poolTypeInfo.get(index);
         if (typeInfo == null) {
-            typeInfo = new TypeInfo(objectPool, type, type.getTypeTag());
+            typeInfo = new JavaTypeInfo(objectPool, type, type.getTypeTag());
             poolTypeInfo.put(index, typeInfo);
         }
         return typeInfo;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/TypeInfo.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaTypeInfo.java
similarity index 93%
rename from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/TypeInfo.java
rename to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaTypeInfo.java
index 453cf39..e60cc5d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/TypeInfo.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/JavaTypeInfo.java
@@ -23,13 +23,13 @@ import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.IAType;
 import org.apache.asterix.om.util.container.IObjectPool;
 
-public class TypeInfo {
+public class JavaTypeInfo {
 
     private IObjectPool<IJObject, IAType> objectPool;
     private IAType atype;
     private ATypeTag typeTag;
 
-    public TypeInfo(IObjectPool<IJObject, IAType> objectPool, IAType atype, 
ATypeTag typeTag) {
+    public JavaTypeInfo(IObjectPool<IJObject, IAType> objectPool, IAType 
atype, ATypeTag typeTag) {
         this.objectPool = objectPool;
         this.atype = atype;
         this.typeTag = typeTag;
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PyTypeInfo.java
similarity index 54%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
copy to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PyTypeInfo.java
index e17f03f..e2b7ad4 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/py_nested_access/py_nested_access.4.query.sqlpp
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PyTypeInfo.java
@@ -16,16 +16,39 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-/*
-* Description  : Access a records nested records at each level.
-* Expected Res : Success
-* Date         : 04 Jun 2015
-*/
+package org.apache.asterix.external.library;
+
+import java.io.DataOutput;
+import java.util.Objects;
+
+import org.apache.asterix.om.types.IAType;
+
+public class PyTypeInfo {
+
+    private final IAType type;
+    private final DataOutput out;
+
+    public PyTypeInfo(IAType type, DataOutput out) {
+        this.type = type;
+        this.out = out;
+    }
+
+    public DataOutput getDataOutput() {
+        return out;
+    }
+
+    public IAType getType() {
+        return type;
+    }
 
-use test;
+    @Override
+    public int hashCode() {
+        return Objects.hash(out, type);
+    }
 
+    @Override
+    public boolean equals(Object obj) {
+        return out.equals(out) && type.equals(type);
+    }
 
-select element result
-from  Animals as test
-with  result as 
roundtrip(test)[0].class.fullClassification.lower.lower.lower.lower.lower.lower.Species
-order by result;
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
index 457b86a..f82b30d 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/PythonLibraryEvaluator.java
@@ -34,7 +34,7 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.external.ipc.ExternalFunctionResultRouter;
 import org.apache.asterix.external.ipc.PythonIPCProto;
-import org.apache.asterix.external.library.msgpack.MessagePackerFromADM;
+import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.om.functions.IExternalFunctionInfo;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.asterix.om.types.EnumDeserializer;
@@ -50,6 +50,7 @@ import org.apache.hyracks.api.job.JobId;
 import org.apache.hyracks.api.resources.IDeallocatable;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.hyracks.data.std.primitive.TaggedValuePointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.std.base.AbstractStateObject;
 import org.apache.hyracks.ipc.impl.IPCSystem;
 
@@ -86,7 +87,6 @@ public class PythonLibraryEvaluator extends 
AbstractStateObject implements IDeal
         this.ipcSys = ipcSys;
         this.warningCollector = warningCollector;
         this.sourceLoc = sourceLoc;
-
     }
 
     private void initialize() throws IOException, AsterixException {
@@ -128,10 +128,11 @@ public class PythonLibraryEvaluator extends 
AbstractStateObject implements IDeal
         return proto.init(packageModule, clazz, fn);
     }
 
-    public ByteBuffer callPython(long id, ByteBuffer arguments, int numArgs) 
throws IOException {
+    public ByteBuffer callPython(long id, IAType[] argTypes, IValueReference[] 
valueReferences, boolean nullCall)
+            throws IOException {
         ByteBuffer ret = null;
         try {
-            ret = proto.call(id, arguments, numArgs);
+            ret = proto.call(id, argTypes, valueReferences, nullCall);
         } catch (AsterixException e) {
             if (warningCollector.shouldWarn()) {
                 warningCollector.warn(Warning.of(sourceLoc, 
EXTERNAL_UDF_EXCEPTION, e.getMessage()));
@@ -140,7 +141,7 @@ public class PythonLibraryEvaluator extends 
AbstractStateObject implements IDeal
         return ret;
     }
 
-    public ByteBuffer callPythonMulti(long id, ByteBuffer arguments, int 
numTuples) throws IOException {
+    public ByteBuffer callPythonMulti(long id, ArrayBackedValueStorage 
arguments, int numTuples) throws IOException {
         ByteBuffer ret = null;
         try {
             ret = proto.callMulti(id, arguments, numTuples);
@@ -169,20 +170,6 @@ public class PythonLibraryEvaluator extends 
AbstractStateObject implements IDeal
         router.removeRoute(proto.getRouteId());
     }
 
-    public static ATypeTag setArgument(IAType type, IValueReference 
valueReference, ByteBuffer argHolder,
-            boolean nullCall) throws IOException {
-        ATypeTag tag = type.getTypeTag();
-        if (tag == ATypeTag.ANY) {
-            TaggedValuePointable pointy = 
TaggedValuePointable.FACTORY.createPointable();
-            pointy.set(valueReference);
-            ATypeTag rtTypeTag = 
EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
-            IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
-            return MessagePackerFromADM.pack(valueReference, rtType, 
argHolder, nullCall);
-        } else {
-            return MessagePackerFromADM.pack(valueReference, type, argHolder, 
nullCall);
-        }
-    }
-
     public static ATypeTag peekArgument(IAType type, IValueReference 
valueReference) throws HyracksDataException {
         ATypeTag tag = type.getTypeTag();
         if (tag == ATypeTag.ANY) {
@@ -190,15 +177,15 @@ public class PythonLibraryEvaluator extends 
AbstractStateObject implements IDeal
             pointy.set(valueReference);
             ATypeTag rtTypeTag = 
EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(pointy.getTag());
             IAType rtType = TypeTagUtil.getBuiltinTypeByTag(rtTypeTag);
-            return MessagePackerFromADM.peekUnknown(rtType);
+            return MessagePackUtils.peekUnknown(rtType);
         } else {
-            return MessagePackerFromADM.peekUnknown(type);
+            return MessagePackUtils.peekUnknown(type);
         }
     }
 
-    public static void setVoidArgument(ByteBuffer argHolder) {
-        argHolder.put(ARRAY16);
-        argHolder.putShort((short) 0);
+    public static void setVoidArgument(ArrayBackedValueStorage argHolder) 
throws IOException {
+        argHolder.getDataOutput().writeByte(ARRAY16);
+        argHolder.getDataOutput().writeShort((short) 0);
     }
 
     public static PythonLibraryEvaluator getInstance(IExternalFunctionInfo 
finfo, ILibraryManager libMgr,
@@ -218,4 +205,5 @@ public class PythonLibraryEvaluator extends 
AbstractStateObject implements IDeal
         }
         return evaluator;
     }
+
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
index 230627f..5479b56 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java
@@ -42,7 +42,7 @@ import org.apache.asterix.external.api.IJListAccessor;
 import org.apache.asterix.external.api.IJObject;
 import org.apache.asterix.external.api.IJObjectAccessor;
 import org.apache.asterix.external.api.IJRecordAccessor;
-import org.apache.asterix.external.library.TypeInfo;
+import org.apache.asterix.external.library.JavaTypeInfo;
 import org.apache.asterix.external.library.java.base.JBoolean;
 import org.apache.asterix.external.library.java.base.JByte;
 import org.apache.asterix.external.library.java.base.JDate;
@@ -356,14 +356,14 @@ public class JObjectAccessors {
 
     public static class JRecordAccessor implements IJRecordAccessor {
 
-        private final TypeInfo typeInfo;
+        private final JavaTypeInfo typeInfo;
         private final JRecord jRecord;
         private final IJObject[] jObjects;
         private final LinkedHashMap<String, IJObject> openFields;
         private final UTF8StringReader reader = new UTF8StringReader();
 
         public JRecordAccessor(ARecordType recordType, IObjectPool<IJObject, 
IAType> objectPool) {
-            this.typeInfo = new TypeInfo(objectPool, null, null);
+            this.typeInfo = new JavaTypeInfo(objectPool, null, null);
             this.jObjects = new IJObject[recordType.getFieldNames().length];
             this.openFields = new LinkedHashMap<>();
             this.jRecord = new JRecord(recordType, jObjects, openFields);
@@ -439,10 +439,10 @@ public class JObjectAccessors {
 
     public static class JListAccessor implements IJListAccessor {
 
-        private final TypeInfo typeInfo;
+        private final JavaTypeInfo typeInfo;
 
         public JListAccessor(IObjectPool<IJObject, IAType> objectPool) {
-            this.typeInfo = new TypeInfo(objectPool, null, null);
+            this.typeInfo = new JavaTypeInfo(objectPool, null, null);
         }
 
         @Override
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectPointableVisitor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectPointableVisitor.java
index 9edc569..bd64922 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectPointableVisitor.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectPointableVisitor.java
@@ -25,7 +25,7 @@ import org.apache.asterix.external.api.IJListAccessor;
 import org.apache.asterix.external.api.IJObject;
 import org.apache.asterix.external.api.IJObjectAccessor;
 import org.apache.asterix.external.api.IJRecordAccessor;
-import org.apache.asterix.external.library.TypeInfo;
+import org.apache.asterix.external.library.JavaTypeInfo;
 import org.apache.asterix.external.library.java.JObjectAccessors.JListAccessor;
 import 
org.apache.asterix.external.library.java.JObjectAccessors.JRecordAccessor;
 import org.apache.asterix.om.pointables.AFlatValuePointable;
@@ -37,7 +37,7 @@ import org.apache.asterix.om.types.ARecordType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-public class JObjectPointableVisitor implements 
IVisitablePointableVisitor<IJObject, TypeInfo> {
+public class JObjectPointableVisitor implements 
IVisitablePointableVisitor<IJObject, JavaTypeInfo> {
 
     private final Map<ATypeTag, IJObjectAccessor> flatJObjectAccessors = new 
HashMap<ATypeTag, IJObjectAccessor>();
     private final Map<IVisitablePointable, IJRecordAccessor> 
raccessorToJObject =
@@ -46,7 +46,7 @@ public class JObjectPointableVisitor implements 
IVisitablePointableVisitor<IJObj
             new HashMap<IVisitablePointable, IJListAccessor>();
 
     @Override
-    public IJObject visit(AListVisitablePointable accessor, TypeInfo arg) 
throws HyracksDataException {
+    public IJObject visit(AListVisitablePointable accessor, JavaTypeInfo arg) 
throws HyracksDataException {
         IJObject result = null;
         IJListAccessor jListAccessor = laccessorToPrinter.get(accessor);
         if (jListAccessor == null) {
@@ -58,7 +58,7 @@ public class JObjectPointableVisitor implements 
IVisitablePointableVisitor<IJObj
     }
 
     @Override
-    public IJObject visit(ARecordVisitablePointable accessor, TypeInfo arg) 
throws HyracksDataException {
+    public IJObject visit(ARecordVisitablePointable accessor, JavaTypeInfo 
arg) throws HyracksDataException {
         IJObject result = null;
         IJRecordAccessor jRecordAccessor = raccessorToJObject.get(accessor);
         if (jRecordAccessor == null) {
@@ -70,7 +70,7 @@ public class JObjectPointableVisitor implements 
IVisitablePointableVisitor<IJObj
     }
 
     @Override
-    public IJObject visit(AFlatValuePointable accessor, TypeInfo arg) throws 
HyracksDataException {
+    public IJObject visit(AFlatValuePointable accessor, JavaTypeInfo arg) 
throws HyracksDataException {
         ATypeTag typeTag = arg.getTypeTag();
         IJObject result = null;
         IJObjectAccessor jObjectAccessor = flatJObjectAccessors.get(typeTag);
@@ -83,7 +83,8 @@ public class JObjectPointableVisitor implements 
IVisitablePointableVisitor<IJObj
         return result;
     }
 
-    public IJObject visit(AFlatValuePointable accessor, ATypeTag typeTag, 
TypeInfo arg) throws HyracksDataException {
+    public IJObject visit(AFlatValuePointable accessor, ATypeTag typeTag, 
JavaTypeInfo arg)
+            throws HyracksDataException {
         IJObject result = null;
         IJObjectAccessor jObjectAccessor = flatJObjectAccessors.get(typeTag);
         if (jObjectAccessor == null) {
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackAccessor.java
similarity index 75%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
copy to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackAccessor.java
index 8041b5e..6b2f0f8 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackAccessor.java
@@ -16,10 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use externallibtest;
+package org.apache.asterix.external.library.msgpack;
 
-create function typeValidation(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib;
+import java.io.IOException;
 
-create function typeValidationNullCall(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib with {"null-call": true};
+@FunctionalInterface
+public interface IMsgPackAccessor<T, U, R> {
+    R apply(T t, U u) throws IOException;
+}
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackListAccessor.java
similarity index 75%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
copy to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackListAccessor.java
index 8041b5e..a061a06 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackListAccessor.java
@@ -16,10 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use externallibtest;
+package org.apache.asterix.external.library.msgpack;
 
-create function typeValidation(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-create function typeValidationNullCall(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib with {"null-call": true};
+@FunctionalInterface
+public interface IMsgPackListAccessor<P, D, T, V, R> {
+
+    R access(P p, D d, T t, V v) throws HyracksDataException;
+
+}
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackRecordAccessor.java
similarity index 75%
copy from 
asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
copy to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackRecordAccessor.java
index 8041b5e..0ce417d 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/external-library/python_open_type_validation/type_validation.2.ddl.sqlpp
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/IMsgPackRecordAccessor.java
@@ -16,10 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-use externallibtest;
+package org.apache.asterix.external.library.msgpack;
 
-create function typeValidation(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 
-create function typeValidationNullCall(a, b, c, d, e, f, g, h)
-  as "roundtrip", "Tests.roundtrip" at testlib with {"null-call": true};
+@FunctionalInterface
+public interface IMsgPackRecordAccessor<P, D, T, V, R> {
+
+    R access(P p, D d, T t, V v) throws HyracksDataException;
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackUtils.java
new file mode 100644
index 0000000..2377f9a
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackUtils.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library.msgpack;
+
+import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
+import static org.msgpack.core.MessagePack.Code.FIXSTR_PREFIX;
+import static org.msgpack.core.MessagePack.Code.STR32;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MessagePackUtils {
+
+    public static ATypeTag peekUnknown(IAType type) {
+        switch (type.getTypeTag()) {
+            case MISSING:
+            case NULL:
+                return type.getTypeTag();
+            default:
+                return ATypeTag.TYPE;
+        }
+    }
+
+    public static void packFixPos(ByteBuffer buf, byte in) throws 
HyracksDataException {
+        byte mask = (byte) (1 << 7);
+        if ((in & mask) != 0) {
+            throw 
HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+                    "fixint7 must be positive");
+        }
+        buf.put(in);
+    }
+
+    public static void packFixStr(ByteBuffer buf, String in) throws 
HyracksDataException {
+        byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
+        if (strBytes.length > 31) {
+            throw 
HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
+                    "fixint7 must be positive");
+        }
+        buf.put((byte) (FIXSTR_PREFIX + strBytes.length));
+        buf.put(strBytes);
+    }
+
+    public static void packStr(ByteBuffer out, String in) {
+        out.put(STR32);
+        byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
+        out.putInt(strBytes.length);
+        out.put(strBytes);
+    }
+
+    public static void packFixArrayHeader(ByteBuffer buf, byte numObj) {
+        buf.put((byte) (FIXARRAY_PREFIX + (0x0F & numObj)));
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
deleted file mode 100644
index f0ac56e..0000000
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessagePackerFromADM.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.external.library.msgpack;
-
-import static org.msgpack.core.MessagePack.Code.ARRAY32;
-import static org.msgpack.core.MessagePack.Code.FALSE;
-import static org.msgpack.core.MessagePack.Code.FIXARRAY_PREFIX;
-import static org.msgpack.core.MessagePack.Code.FIXSTR_PREFIX;
-import static org.msgpack.core.MessagePack.Code.FLOAT32;
-import static org.msgpack.core.MessagePack.Code.FLOAT64;
-import static org.msgpack.core.MessagePack.Code.INT16;
-import static org.msgpack.core.MessagePack.Code.INT32;
-import static org.msgpack.core.MessagePack.Code.INT64;
-import static org.msgpack.core.MessagePack.Code.INT8;
-import static org.msgpack.core.MessagePack.Code.MAP32;
-import static org.msgpack.core.MessagePack.Code.NIL;
-import static org.msgpack.core.MessagePack.Code.STR32;
-import static org.msgpack.core.MessagePack.Code.TRUE;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
-
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.om.types.ATypeTag;
-import org.apache.asterix.om.types.AUnionType;
-import org.apache.asterix.om.types.AbstractCollectionType;
-import org.apache.asterix.om.types.IAType;
-import org.apache.asterix.om.types.TypeTagUtil;
-import org.apache.asterix.om.utils.NonTaggedFormatUtil;
-import org.apache.asterix.om.utils.RecordUtil;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.data.std.api.IValueReference;
-import org.apache.hyracks.data.std.primitive.BooleanPointable;
-import org.apache.hyracks.data.std.primitive.BytePointable;
-import org.apache.hyracks.data.std.primitive.DoublePointable;
-import org.apache.hyracks.data.std.primitive.FloatPointable;
-import org.apache.hyracks.data.std.primitive.IntegerPointable;
-import org.apache.hyracks.data.std.primitive.LongPointable;
-import org.apache.hyracks.data.std.primitive.ShortPointable;
-import org.apache.hyracks.util.string.UTF8StringUtil;
-
-public class MessagePackerFromADM {
-
-    private static final int TYPE_TAG_SIZE = 1;
-    private static final int TYPE_SIZE = 1;
-    private static final int LENGTH_SIZE = 4;
-    private static final int ITEM_COUNT_SIZE = 4;
-    private static final int ITEM_OFFSET_SIZE = 4;
-
-    public static ATypeTag pack(IValueReference ptr, IAType type, ByteBuffer 
out, boolean packUnknown)
-            throws HyracksDataException {
-        return pack(ptr.getByteArray(), ptr.getStartOffset(), type, true, 
packUnknown, out);
-    }
-
-    public static ATypeTag pack(byte[] ptr, int offs, IAType type, boolean 
tagged, boolean packUnknown, ByteBuffer out)
-            throws HyracksDataException {
-        int relOffs = tagged ? offs + 1 : offs;
-        ATypeTag tag = type.getTypeTag();
-        switch (tag) {
-            case STRING:
-                packStr(ptr, relOffs, out);
-                break;
-            case BOOLEAN:
-                if (BooleanPointable.getBoolean(ptr, relOffs)) {
-                    out.put(TRUE);
-                } else {
-                    out.put(FALSE);
-                }
-                break;
-            case TINYINT:
-                packByte(out, BytePointable.getByte(ptr, relOffs));
-                break;
-            case SMALLINT:
-                packShort(out, ShortPointable.getShort(ptr, relOffs));
-                break;
-            case INTEGER:
-                packInt(out, IntegerPointable.getInteger(ptr, relOffs));
-                break;
-            case BIGINT:
-                packLong(out, LongPointable.getLong(ptr, relOffs));
-                break;
-            case FLOAT:
-                packFloat(out, FloatPointable.getFloat(ptr, relOffs));
-                break;
-            case DOUBLE:
-                packDouble(out, DoublePointable.getDouble(ptr, relOffs));
-                break;
-            case ARRAY:
-            case MULTISET:
-                packArray(ptr, offs, type, out);
-                break;
-            case OBJECT:
-                packObject(ptr, offs, type, out);
-                break;
-            case MISSING:
-            case NULL:
-                if (packUnknown) {
-                    packNull(out);
-                    break;
-                } else {
-                    return tag;
-                }
-            default:
-                throw 
HyracksDataException.create(AsterixException.create(ErrorCode.PARSER_ADM_DATA_PARSER_CAST_ERROR,
-                        tag.name(), "to a msgpack"));
-        }
-        return ATypeTag.TYPE;
-    }
-
-    public static ATypeTag peekUnknown(IAType type) {
-        switch (type.getTypeTag()) {
-            case MISSING:
-            case NULL:
-                return type.getTypeTag();
-            default:
-                return ATypeTag.TYPE;
-        }
-    }
-
-    public static void packNull(ByteBuffer out) {
-        out.put(NIL);
-    }
-
-    public static void packByte(ByteBuffer out, byte in) {
-        out.put(INT8);
-        out.put(in);
-    }
-
-    public static void packShort(ByteBuffer out, short in) {
-        out.put(INT16);
-        out.putShort(in);
-    }
-
-    public static void packInt(ByteBuffer out, int in) {
-        out.put(INT32);
-        out.putInt(in);
-
-    }
-
-    public static void packLong(ByteBuffer out, long in) {
-        out.put(INT64);
-        out.putLong(in);
-    }
-
-    public static void packFloat(ByteBuffer out, float in) {
-        out.put(FLOAT32);
-        out.putFloat(in);
-    }
-
-    public static void packDouble(ByteBuffer out, double in) {
-        out.put(FLOAT64);
-        out.putDouble(in);
-    }
-
-    public static void packFixPos(ByteBuffer out, byte in) throws 
HyracksDataException {
-        byte mask = (byte) (1 << 7);
-        if ((in & mask) != 0) {
-            throw 
HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
-                    "fixint7 must be positive");
-        }
-        out.put(in);
-    }
-
-    public static void packFixStr(ByteBuffer buf, String in) throws 
HyracksDataException {
-        byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
-        if (strBytes.length > 31) {
-            throw 
HyracksDataException.create(org.apache.hyracks.api.exceptions.ErrorCode.ILLEGAL_STATE,
-                    "fixint7 must be positive");
-        }
-        buf.put((byte) (FIXSTR_PREFIX + strBytes.length));
-        buf.put(strBytes);
-    }
-
-    public static void packStr(ByteBuffer out, String in) {
-        out.put(STR32);
-        byte[] strBytes = in.getBytes(StandardCharsets.UTF_8);
-        out.putInt(strBytes.length);
-        out.put(strBytes);
-    }
-
-    private static void packStr(byte[] in, int offs, ByteBuffer out) {
-        out.put(STR32);
-        //TODO: tagged/untagged. closed support is borked so always tagged rn
-        String str = UTF8StringUtil.toString(in, offs);
-        byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
-        out.putInt(strBytes.length);
-        out.put(strBytes);
-    }
-
-    public static void packStr(String str, ByteBuffer out) {
-        out.put(STR32);
-        byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
-        out.putInt(strBytes.length);
-        out.put(strBytes);
-    }
-
-    private static void packArray(byte[] in, int offs, IAType type, ByteBuffer 
out) throws HyracksDataException {
-        //TODO: - could optimize to pack fixarray/array16 for small arrays
-        //      - this code is basically a static version of AListPointable, 
could be deduped
-        AbstractCollectionType collType = (AbstractCollectionType) type;
-        out.put(ARRAY32);
-        int lenOffs = offs + TYPE_TAG_SIZE + TYPE_SIZE;
-        int itemCtOffs = LENGTH_SIZE + lenOffs;
-        int itemCt = IntegerPointable.getInteger(in, itemCtOffs);
-        boolean fixType = NonTaggedFormatUtil.isFixedSizedCollection(type);
-        out.putInt(itemCt);
-        for (int i = 0; i < itemCt; i++) {
-            if (fixType) {
-                int itemOffs = itemCtOffs + ITEM_COUNT_SIZE + (i
-                        * NonTaggedFormatUtil.getFieldValueLength(in, 0, 
collType.getItemType().getTypeTag(), false));
-                pack(in, itemOffs, collType.getItemType(), false, true, out);
-            } else {
-                int itemOffs =
-                        offs + IntegerPointable.getInteger(in, itemCtOffs + 
ITEM_COUNT_SIZE + (i * ITEM_OFFSET_SIZE));
-                ATypeTag tag = 
ATypeTag.VALUE_TYPE_MAPPING[BytePointable.getByte(in, itemOffs)];
-                pack(in, itemOffs, TypeTagUtil.getBuiltinTypeByTag(tag), true, 
true, out);
-            }
-        }
-    }
-
-    private static void packObject(byte[] in, int offs, IAType type, 
ByteBuffer out) throws HyracksDataException {
-        ARecordType recType = (ARecordType) type;
-        out.put(MAP32);
-        int fieldCt = recType.getFieldNames().length + 
RecordUtils.getOpenFieldCount(in, offs, recType);
-        out.putInt(fieldCt);
-        for (int i = 0; i < recType.getFieldNames().length; i++) {
-            String field = recType.getFieldNames()[i];
-            IAType fieldType = RecordUtils.getClosedFieldType(recType, i);
-            packStr(field, out);
-            pack(in, RecordUtils.getClosedFieldOffset(in, offs, recType, i), 
fieldType, false, true, out);
-        }
-        if (RecordUtils.isExpanded(in, offs, recType)) {
-            for (int i = 0; i < RecordUtils.getOpenFieldCount(in, offs, 
recType); i++) {
-                packStr(in, RecordUtils.getOpenFieldNameOffset(in, offs, 
recType, i), out);
-                ATypeTag tag = 
ATypeTag.VALUE_TYPE_MAPPING[RecordUtils.getOpenFieldTag(in, offs, recType, i)];
-                pack(in, RecordUtils.getOpenFieldValueOffset(in, offs, 
recType, i),
-                        TypeTagUtil.getBuiltinTypeByTag(tag), true, true, out);
-            }
-        }
-
-    }
-
-    public static void packFixArrayHeader(ByteBuffer buf, byte numObj) {
-        buf.put((byte) (FIXARRAY_PREFIX + (0x0F & numObj)));
-    }
-
-    private static class RecordUtils {
-
-        static final int TAG_SIZE = 1;
-        static final int RECORD_LENGTH_SIZE = 4;
-        static final int EXPANDED_SIZE = 1;
-        static final int OPEN_OFFSET_SIZE = 4;
-        static final int CLOSED_COUNT_SIZE = 4;
-        static final int FIELD_OFFSET_SIZE = 4;
-        static final int OPEN_COUNT_SIZE = 4;
-        private static final int OPEN_FIELD_HASH_SIZE = 4;
-        private static final int OPEN_FIELD_OFFSET_SIZE = 4;
-        private static final int OPEN_FIELD_HEADER = OPEN_FIELD_HASH_SIZE + 
OPEN_FIELD_OFFSET_SIZE;
-
-        private static boolean isOpen(ARecordType recordType) {
-            return recordType == null || recordType.isOpen();
-        }
-
-        public static int getLength(byte[] bytes, int start) {
-            return IntegerPointable.getInteger(bytes, start + TAG_SIZE);
-        }
-
-        public static boolean isExpanded(byte[] bytes, int start, ARecordType 
recordType) {
-            return isOpen(recordType) && BooleanPointable.getBoolean(bytes, 
start + TAG_SIZE + RECORD_LENGTH_SIZE);
-        }
-
-        public static int getOpenPartOffset(int start, ARecordType recordType) 
{
-            return start + TAG_SIZE + RECORD_LENGTH_SIZE + (isOpen(recordType) 
? EXPANDED_SIZE : 0);
-        }
-
-        public static int getNullBitmapOffset(byte[] bytes, int start, 
ARecordType recordType) {
-            return getOpenPartOffset(start, recordType) + (isExpanded(bytes, 
start, recordType) ? OPEN_OFFSET_SIZE : 0)
-                    + CLOSED_COUNT_SIZE;
-        }
-
-        public static int getNullBitmapSize(ARecordType recordType) {
-            return RecordUtil.computeNullBitmapSize(recordType);
-        }
-
-        public static final IAType getClosedFieldType(ARecordType recordType, 
int fieldId) {
-            IAType aType = recordType.getFieldTypes()[fieldId];
-            if (NonTaggedFormatUtil.isOptional(aType)) {
-                // optional field: add the embedded non-null type tag
-                aType = ((AUnionType) aType).getActualType();
-            }
-            return aType;
-        }
-
-        public static final int getClosedFieldOffset(byte[] bytes, int start, 
ARecordType recordType, int fieldId) {
-            int offset = getNullBitmapOffset(bytes, start, recordType) + 
getNullBitmapSize(recordType)
-                    + fieldId * FIELD_OFFSET_SIZE;
-            return start + IntegerPointable.getInteger(bytes, offset);
-        }
-
-        public static final int getOpenFieldCount(byte[] bytes, int start, 
ARecordType recordType) {
-            return isExpanded(bytes, start, recordType)
-                    ? IntegerPointable.getInteger(bytes, 
getOpenFieldCountOffset(bytes, start, recordType)) : 0;
-        }
-
-        public static int getOpenFieldCountSize(byte[] bytes, int start, 
ARecordType recordType) {
-            return isExpanded(bytes, start, recordType) ? OPEN_COUNT_SIZE : 0;
-        }
-
-        public static int getOpenFieldCountOffset(byte[] bytes, int start, 
ARecordType recordType) {
-            return start + IntegerPointable.getInteger(bytes, 
getOpenPartOffset(start, recordType));
-        }
-
-        public static final int getOpenFieldValueOffset(byte[] bytes, int 
start, ARecordType recordType, int fieldId) {
-            return getOpenFieldNameOffset(bytes, start, recordType, fieldId)
-                    + getOpenFieldNameSize(bytes, start, recordType, fieldId);
-        }
-
-        public static int getOpenFieldNameSize(byte[] bytes, int start, 
ARecordType recordType, int fieldId) {
-            int utfleng = UTF8StringUtil.getUTFLength(bytes, 
getOpenFieldNameOffset(bytes, start, recordType, fieldId));
-            return utfleng + UTF8StringUtil.getNumBytesToStoreLength(utfleng);
-        }
-
-        public static int getOpenFieldNameOffset(byte[] bytes, int start, 
ARecordType recordType, int fieldId) {
-            return getOpenFieldOffset(bytes, start, recordType, fieldId);
-        }
-
-        public static final byte getOpenFieldTag(byte[] bytes, int start, 
ARecordType recordType, int fieldId) {
-            return bytes[getOpenFieldValueOffset(bytes, start, recordType, 
fieldId)];
-        }
-
-        public static int getOpenFieldHashOffset(byte[] bytes, int start, 
ARecordType recordType, int fieldId) {
-            return getOpenFieldCountOffset(bytes, start, recordType) + 
getOpenFieldCountSize(bytes, start, recordType)
-                    + fieldId * OPEN_FIELD_HEADER;
-        }
-
-        public static int getOpenFieldOffset(byte[] bytes, int start, 
ARecordType recordType, int fieldId) {
-            return start
-                    + IntegerPointable.getInteger(bytes, 
getOpenFieldOffsetOffset(bytes, start, recordType, fieldId));
-        }
-
-        public static int getOpenFieldOffsetOffset(byte[] bytes, int start, 
ARecordType recordType, int fieldId) {
-            return getOpenFieldHashOffset(bytes, start, recordType, fieldId) + 
OPEN_FIELD_HASH_SIZE;
-        }
-    }
-
-}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
index 4af1121..d6121e8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MessageUnpackerToADM.java
@@ -18,18 +18,39 @@ package org.apache.asterix.external.library.msgpack;
 
 import static org.msgpack.core.MessagePack.Code.*;
 
+import java.io.DataOutput;
+import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
 
+import org.apache.asterix.builders.AbvsBuilderFactory;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
+import 
org.apache.asterix.external.input.stream.StandardUTF8ToModifiedUTF8DataOutput;
+import 
org.apache.asterix.external.input.stream.builders.ListLikeNumericArrayFactory;
+import 
org.apache.asterix.external.input.stream.builders.StandardToModifiedUTF8DataOutputFactory;
 import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.util.container.IObjectPool;
+import org.apache.asterix.om.util.container.ListObjectPool;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.primitive.IntegerPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.util.string.UTF8StringUtil;
 
 public class MessageUnpackerToADM {
 
-    public static void unpack(ByteBuffer in, ByteBuffer out, boolean tagged) 
throws HyracksDataException {
+    private final IObjectPool<IMutableValueStorage, ATypeTag> abvsBuilderPool =
+            new ListObjectPool<>(new AbvsBuilderFactory());
+    private final IObjectPool<StandardUTF8ToModifiedUTF8DataOutput, ATypeTag> 
utfPool =
+            new ListObjectPool<>(new 
StandardToModifiedUTF8DataOutputFactory());
+    private final IObjectPool<List<Long>, Long> listPool = new 
ListObjectPool<>(new ListLikeNumericArrayFactory<>());
+
+    public MessageUnpackerToADM() {
+    }
+
+    public void unpack(ByteBuffer in, DataOutput out, boolean tagged) throws 
IOException {
         byte tag = NIL;
         if (in != null) {
             tag = in.get();
@@ -38,12 +59,12 @@ public class MessageUnpackerToADM {
             unpackStr(in, out, (tag ^ FIXSTR_PREFIX), tagged);
         } else if (isFixInt(tag)) {
             if (tagged) {
-                out.put(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
+                out.writeByte(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
             }
             if (isPosFixInt(tag)) {
-                out.put(tag);
+                out.writeByte(tag);
             } else if (isNegFixInt(tag)) {
-                out.put(tag);
+                out.writeByte(tag);
             }
         } else if (isFixedArray(tag)) {
             unpackArray(in, out, (tag ^ FIXARRAY_PREFIX));
@@ -52,15 +73,15 @@ public class MessageUnpackerToADM {
         } else {
             switch (tag) {
                 case TRUE:
-                    out.put(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
-                    out.put((byte) 1);
+                    out.writeByte(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+                    out.writeByte((byte) 1);
                     break;
                 case FALSE:
-                    out.put(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
-                    out.put((byte) 0);
+                    out.writeByte(ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG);
+                    out.writeByte((byte) 0);
                     break;
                 case NIL:
-                    out.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                    out.writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
                     break;
                 case UINT8:
                     unpackUByte(in, out, tagged);
@@ -113,7 +134,6 @@ public class MessageUnpackerToADM {
                 case MAP32:
                     unpackMap(in, out, (int) 
Integer.toUnsignedLong(in.getInt()));
                     break;
-
                 default:
                     throw HyracksDataException.create(AsterixException.create(
                             ErrorCode.PARSER_ADM_DATA_PARSER_CAST_ERROR, 
"msgpack tag " + tag + " ", "to an ADM type"));
@@ -121,154 +141,159 @@ public class MessageUnpackerToADM {
         }
     }
 
-    public static void unpackByte(ByteBuffer in, ByteBuffer out, boolean 
tagged) {
+    public static void unpackByte(ByteBuffer in, DataOutput out, boolean 
tagged) throws IOException {
         if (tagged) {
-            out.put(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_INT8_TYPE_TAG);
         }
-        out.put(in.get());
+        out.writeByte(in.get());
     }
 
-    public static void unpackShort(ByteBuffer in, ByteBuffer out, boolean 
tagged) {
+    public static void unpackShort(ByteBuffer in, DataOutput out, boolean 
tagged) throws IOException {
         if (tagged) {
-            out.put(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
         }
-        out.putShort(in.getShort());
+        out.writeShort(in.getShort());
     }
 
-    public static void unpackInt(ByteBuffer in, ByteBuffer out, boolean 
tagged) {
+    public static void unpackInt(ByteBuffer in, DataOutput out, boolean 
tagged) throws IOException {
         if (tagged) {
-            out.put(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
         }
-        out.putInt(in.getInt());
+        out.writeInt(in.getInt());
     }
 
-    public static void unpackLong(ByteBuffer in, ByteBuffer out, boolean 
tagged) {
+    public static void unpackLong(ByteBuffer in, DataOutput out, boolean 
tagged) throws IOException {
         if (tagged) {
-            out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
         }
-        out.putLong(in.getLong());
+        out.writeLong(in.getLong());
     }
 
-    public static void unpackUByte(ByteBuffer in, ByteBuffer out, boolean 
tagged) {
+    public static void unpackUByte(ByteBuffer in, DataOutput out, boolean 
tagged) throws IOException {
         if (tagged) {
-            out.put(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_INT16_TYPE_TAG);
         }
-        out.putShort((short) (in.get() & ((short) 0x00FF)));
+        out.writeShort((short) (in.get() & ((short) 0x00FF)));
     }
 
-    public static void unpackUShort(ByteBuffer in, ByteBuffer out, boolean 
tagged) {
+    public static void unpackUShort(ByteBuffer in, DataOutput out, boolean 
tagged) throws IOException {
         if (tagged) {
-            out.put(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_INT32_TYPE_TAG);
         }
-        out.putInt(in.getShort() & 0x0000FFFF);
+        out.writeInt(in.getShort() & 0x0000FFFF);
     }
 
-    public static void unpackUInt(ByteBuffer in, ByteBuffer out, boolean 
tagged) {
+    public static void unpackUInt(ByteBuffer in, DataOutput out, boolean 
tagged) throws IOException {
         if (tagged) {
-            out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
         }
-        out.putLong(in.getInt() & 0x00000000FFFFFFFFl);
+        out.writeLong(in.getInt() & 0x00000000FFFFFFFFl);
     }
 
-    public static void unpackULong(ByteBuffer in, ByteBuffer out, boolean 
tagged) {
+    public static void unpackULong(ByteBuffer in, DataOutput out, boolean 
tagged) throws IOException {
         if (tagged) {
-            out.put(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_INT64_TYPE_TAG);
         }
         long val = in.getLong();
         if (val < 0) {
             throw new IllegalArgumentException("Integer overflow");
         }
-        out.putLong(val);
+        out.writeLong(val);
     }
 
-    public static void unpackFloat(ByteBuffer in, ByteBuffer out, boolean 
tagged) {
+    public static void unpackFloat(ByteBuffer in, DataOutput out, boolean 
tagged) throws IOException {
         if (tagged) {
-            out.put(ATypeTag.SERIALIZED_FLOAT_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_FLOAT_TYPE_TAG);
         }
-        out.putFloat(in.getFloat());
+        out.writeFloat(in.getFloat());
 
     }
 
-    public static void unpackDouble(ByteBuffer in, ByteBuffer out, boolean 
tagged) {
+    public static void unpackDouble(ByteBuffer in, DataOutput out, boolean 
tagged) throws IOException {
         if (tagged) {
-            out.put(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_DOUBLE_TYPE_TAG);
         }
-        out.putDouble(in.getDouble());
+        out.writeDouble(in.getDouble());
     }
 
-    public static void unpackArray(ByteBuffer in, ByteBuffer out, long uLen) 
throws HyracksDataException {
+    public void unpackArray(ByteBuffer in, DataOutput out, long uLen) throws 
IOException {
         if (uLen > Integer.MAX_VALUE) {
             throw new UnsupportedOperationException("Array is too long");
         }
+        ArrayBackedValueStorage buildBuf = (ArrayBackedValueStorage) 
abvsBuilderPool.allocate(ATypeTag.ARRAY);
+        buildBuf.reset();
+        DataOutput bufOut = buildBuf.getDataOutput();
         int count = (int) uLen;
-        int offs = out.position();
-        out.put(ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG);
-        out.put(ATypeTag.ANY.serialize());
-        int asxLenPos = out.position();
+        bufOut.writeByte(ATypeTag.SERIALIZED_ORDEREDLIST_TYPE_TAG);
+        bufOut.writeByte(ATypeTag.ANY.serialize());
+        int asxLenPos = buildBuf.getLength();
         //reserve space
-        out.putInt(-1);
-        out.putInt(count);
-        int slotStartOffs = out.position() + out.arrayOffset();
+        bufOut.writeInt(-1);
+        bufOut.writeInt(count);
+        int slotStartOffs = buildBuf.getLength();
         for (int i = 0; i < count; i++) {
-            out.putInt(0xFFFF);
+            bufOut.writeInt(0xDEADBEEF);
         }
         for (int i = 0; i < count; i++) {
-            out.putInt(slotStartOffs + (i * 4), (out.position() - offs));
-            unpack(in, out, true);
+            IntegerPointable.setInteger(buildBuf.getByteArray(), 
((slotStartOffs) + (i * 4)), buildBuf.getLength());
+            //tagged b/c any
+            unpack(in, bufOut, true);
         }
-        int totalLen = out.position() - offs;
-        out.putInt(asxLenPos, totalLen);
+        IntegerPointable.setInteger(buildBuf.getByteArray(), asxLenPos, 
buildBuf.getLength());
+        out.write(buildBuf.getByteArray(), buildBuf.getStartOffset(), 
buildBuf.getLength());
     }
 
-    public static void unpackMap(ByteBuffer in, ByteBuffer out, int count) 
throws HyracksDataException {
+    public void unpackMap(ByteBuffer in, DataOutput out, int count) throws 
IOException {
         //TODO: need to handle typed records. this only produces a completely 
open record.
+        ArrayBackedValueStorage buildBuf = (ArrayBackedValueStorage) 
abvsBuilderPool.allocate(ATypeTag.OBJECT);
+        List<Long> offsets = listPool.allocate((long) count);
+        DataOutput bufOut = buildBuf.getDataOutput();
         //hdr size = 6?
-        int startOffs = out.position();
-        out.put(ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
-        int totalSizeOffs = out.position();
-        out.putInt(-1);
+        bufOut.writeByte(ATypeTag.SERIALIZED_RECORD_TYPE_TAG);
+        int totalSizeOffs = buildBuf.getLength();
+        bufOut.writeInt(-1);
         //isExpanded
-        out.put((byte) 1);
-        int openPartOffs = out.position();
-        out.putInt(-1);
+        bufOut.writeByte((byte) 1);
+        int openPartOffs = buildBuf.getLength();
+        bufOut.writeInt(-1);
         //isExpanded, so num of open fields
-        out.putInt(openPartOffs, out.position() - startOffs);
-        out.putInt(count);
-        int offsetAryPos = out.position();
+        IntegerPointable.setInteger(buildBuf.getByteArray(), openPartOffs, 
buildBuf.getLength());
+        bufOut.writeInt(count);
+        int offsetAryPos = buildBuf.getLength();
         int offsetArySz = count * 2;
         //allocate space for open field offsets
         for (int i = 0; i < offsetArySz; i++) {
-            out.putInt(0xDEADBEEF);
+            bufOut.writeInt(0xDEADBEEF);
         }
         for (int i = 0; i < count; i++) {
-            int offs = out.position() + out.arrayOffset();
-            int relOffs = offs - startOffs;
-            unpack(in, out, false);
-            int hash = UTF8StringUtil.hash(out.array(), offs);
-            out.putInt(offsetAryPos, hash);
+            int offs = buildBuf.getLength();
+            unpack(in, bufOut, false);
+            long hash = UTF8StringUtil.hash(buildBuf.getByteArray(), offs);
+            offsets.set(i, (hash << 32) + offs);
+            unpack(in, bufOut, true);
+        }
+        Collections.sort(offsets);
+        for (Long l : offsets) {
+            IntegerPointable.setInteger(buildBuf.getByteArray(), offsetAryPos, 
(int) (l >> 32));
             offsetAryPos += 4;
-            out.putInt(offsetAryPos, relOffs);
+            IntegerPointable.setInteger(buildBuf.getByteArray(), offsetAryPos, 
(int) ((l << 32) >> 32));
             offsetAryPos += 4;
-            unpack(in, out, true);
         }
-        out.putInt(totalSizeOffs, out.position() - startOffs);
+        IntegerPointable.setInteger(buildBuf.getByteArray(), totalSizeOffs, 
buildBuf.getLength());
+        out.write(buildBuf.getByteArray(), buildBuf.getStartOffset(), 
buildBuf.getLength());
     }
 
-    public static void unpackStr(ByteBuffer in, ByteBuffer out, long uLen, 
boolean tag) {
-        //TODO: this probably breaks for 3 and 4 byte UTF-8
+    public void unpackStr(ByteBuffer in, DataOutput out, long uLen, boolean 
tag) throws IOException {
         if (tag) {
-            out.put(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
+            out.writeByte(ATypeTag.SERIALIZED_STRING_TYPE_TAG);
         }
         if (Long.compareUnsigned(uLen, Integer.MAX_VALUE) > 0) {
             throw new UnsupportedOperationException("String is too long");
         }
         int len = (int) uLen;
-        int strLen = UTF8StringUtil.getStringLength(in.array(), in.position() 
+ in.arrayOffset(), len);
-        int adv = VarLenIntEncoderDecoder.encode(strLen, out.array(), 
out.position() + out.arrayOffset());
-        out.position(out.position() + adv);
-        System.arraycopy(in.array(), in.arrayOffset() + in.position(), 
out.array(), out.arrayOffset() + out.position(),
-                len);
-        out.position(out.position() + len);
+        StandardUTF8ToModifiedUTF8DataOutput conv = 
utfPool.allocate(ATypeTag.STRING);
+        conv.setDataOutput(out);
+        conv.write(in.array(), in.arrayOffset() + in.position(), len);
         in.position(in.position() + len);
     }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackAccessors.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackAccessors.java
new file mode 100644
index 0000000..a90a183
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackAccessors.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.library.msgpack;
+
+import static org.apache.hyracks.util.string.UTF8StringUtil.getUTFLength;
+import static org.msgpack.core.MessagePack.Code.ARRAY32;
+import static org.msgpack.core.MessagePack.Code.FALSE;
+import static org.msgpack.core.MessagePack.Code.FLOAT32;
+import static org.msgpack.core.MessagePack.Code.FLOAT64;
+import static org.msgpack.core.MessagePack.Code.INT16;
+import static org.msgpack.core.MessagePack.Code.INT32;
+import static org.msgpack.core.MessagePack.Code.INT64;
+import static org.msgpack.core.MessagePack.Code.INT8;
+import static org.msgpack.core.MessagePack.Code.MAP32;
+import static org.msgpack.core.MessagePack.Code.NIL;
+import static org.msgpack.core.MessagePack.Code.STR32;
+import static org.msgpack.core.MessagePack.Code.TRUE;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.dataflow.data.nontagged.printers.PrintTools;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.ADoubleSerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AFloatSerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AInt16SerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AInt32SerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AInt64SerializerDeserializer;
+import 
org.apache.asterix.dataflow.data.nontagged.serde.AInt8SerializerDeserializer;
+import org.apache.asterix.external.library.PyTypeInfo;
+import org.apache.asterix.om.pointables.AListVisitablePointable;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.base.IVisitablePointable;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.asterix.om.types.AbstractCollectionType;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.asterix.om.types.EnumDeserializer;
+import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.om.types.TypeTagUtil;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.data.std.api.IPointable;
+import org.apache.hyracks.util.encoding.VarLenIntEncoderDecoder;
+
+public class MsgPackAccessors {
+
+    private MsgPackAccessors() {
+    }
+
+    public static IMsgPackAccessor<IPointable, DataOutput, Void> 
createFlatMsgPackAccessor(ATypeTag aTypeTag)
+            throws HyracksDataException {
+        switch (aTypeTag) {
+            case BOOLEAN:
+                return MsgPackBooleanAccessor::apply;
+            case TINYINT:
+                return MsgPackInt8Accessor::apply;
+            case SMALLINT:
+                return MsgPackInt16Accessor::apply;
+            case INTEGER:
+                return MsgPackInt32Accessor::apply;
+            case BIGINT:
+                return MsgPackInt64Accessor::apply;
+            case FLOAT:
+                return MsgPackFloatAccessor::apply;
+            case DOUBLE:
+                return MsgPackDoubleAccessor::apply;
+            case STRING:
+                return MsgPackStringAccessor::apply;
+            case MISSING:
+            case NULL:
+                return MsgPackNullAccessor::apply;
+            default:
+                throw HyracksDataException
+                        
.create(AsterixException.create(ErrorCode.TYPE_UNSUPPORTED, "msgpack", 
aTypeTag.name()));
+        }
+    }
+
+    public static class MsgPackInt8Accessor {
+        public static Void apply(IPointable pointable, DataOutput out) throws 
IOException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            byte o = AInt8SerializerDeserializer.getByte(b, s + 1);
+            out.writeByte(INT8);
+            out.writeByte(o);
+            return null;
+        }
+    }
+
+    public static class MsgPackInt16Accessor {
+
+        public static Void apply(IPointable pointable, DataOutput out) throws 
IOException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            short i = AInt16SerializerDeserializer.getShort(b, s + 1);
+            out.writeByte(INT16);
+            out.writeShort(i);
+            return null;
+        }
+    }
+
+    public static class MsgPackInt32Accessor {
+
+        public static Void apply(IPointable pointable, DataOutput out) throws 
IOException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            int i = AInt32SerializerDeserializer.getInt(b, s + 1);
+            out.writeByte(INT32);
+            out.writeByte(i);
+            return null;
+        }
+    }
+
+    public static class MsgPackNullAccessor {
+        public static Void apply(IPointable pointable, DataOutput out) throws 
IOException {
+            out.writeByte(NIL);
+            return null;
+        }
+    }
+
+    public static class MsgPackInt64Accessor {
+
+        public static Void apply(IPointable pointable, DataOutput out) throws 
IOException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            long v = AInt64SerializerDeserializer.getLong(b, s + 1);
+            out.writeByte(INT64);
+            out.writeLong(v);
+            return null;
+        }
+    }
+
+    public static class MsgPackFloatAccessor {
+
+        public static Void apply(IPointable pointable, DataOutput out) throws 
IOException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            float v = AFloatSerializerDeserializer.getFloat(b, s + 1);
+            out.writeByte(FLOAT32);
+            out.writeFloat(v);
+            return null;
+        }
+    }
+
+    public static class MsgPackDoubleAccessor {
+        public static Void apply(IPointable pointable, DataOutput out) throws 
IOException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            double v = ADoubleSerializerDeserializer.getDouble(b, s + 1);
+            out.writeByte(FLOAT64);
+            out.writeDouble(v);
+            return null;
+        }
+    }
+
+    public static class MsgPackStringAccessor {
+        public static Void apply(IPointable pointable, DataOutput out) throws 
IOException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            out.writeByte(STR32);
+            final int calculatedLength = getUTFLength(b, s + 1);
+            out.writeInt(calculatedLength);
+            PrintTools.writeUTF8StringRaw(b, s + 1, calculatedLength, out);
+            return null;
+        }
+
+    }
+
+    public static class MsgPackBooleanAccessor {
+        public static Void apply(IPointable pointable, DataOutput out) throws 
IOException {
+            byte[] b = pointable.getByteArray();
+            int s = pointable.getStartOffset();
+            boolean v = ABooleanSerializerDeserializer.getBoolean(b, s + 1);
+            if (v) {
+                out.writeByte(TRUE);
+            } else {
+                out.writeByte(FALSE);
+            }
+            return null;
+        }
+    }
+
+    public static class MsgPackRecordAccessor {
+
+        public static int getUTFLength(byte[] b, int s) {
+            return VarLenIntEncoderDecoder.decode(b, s);
+        }
+
+        public static Void access(ARecordVisitablePointable pointable, 
PyTypeInfo arg,
+                MsgPackPointableVisitor pointableVisitor) throws 
HyracksDataException {
+            List<IVisitablePointable> fieldPointables = 
pointable.getFieldValues();
+            List<IVisitablePointable> fieldTypeTags = 
pointable.getFieldTypeTags();
+            List<IVisitablePointable> fieldNames = pointable.getFieldNames();
+            boolean closedPart;
+            int index = 0;
+            DataOutput out = arg.getDataOutput();
+            ARecordType recordType = ((ARecordType) arg.getType());
+            try {
+                out.writeByte(MAP32);
+                out.writeInt(fieldNames.size());
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+            try {
+                for (IVisitablePointable fieldPointable : fieldPointables) {
+                    closedPart = index < recordType.getFieldTypes().length;
+                    IVisitablePointable tt = fieldTypeTags.get(index);
+                    ATypeTag typeTag =
+                            
EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tt.getByteArray()[tt.getStartOffset()]);
+                    IAType fieldType;
+                    fieldType =
+                            closedPart ? recordType.getFieldTypes()[index] : 
TypeTagUtil.getBuiltinTypeByTag(typeTag);
+                    IPointable fieldName = fieldNames.get(index);
+                    
MsgPackAccessors.createFlatMsgPackAccessor(BuiltinType.ASTRING.getTypeTag()).apply(fieldName,
+                            arg.getDataOutput());
+                    PyTypeInfo fieldTypeInfo = 
pointableVisitor.getTypeInfo(fieldType, out);
+                    fieldPointable.accept(pointableVisitor, fieldTypeInfo);
+                    index++;
+                }
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+            return null;
+        }
+    }
+
+    public static class MsgPackListAccessor {
+
+        public static Void access(AListVisitablePointable pointable, 
PyTypeInfo arg,
+                MsgPackPointableVisitor pointableVisitor) throws 
HyracksDataException {
+            List<IVisitablePointable> items = pointable.getItems();
+            List<IVisitablePointable> itemTags = pointable.getItemTags();
+            DataOutput out = arg.getDataOutput();
+            try {
+                out.writeByte(ARRAY32);
+                out.writeInt(items.size());
+            } catch (IOException e) {
+                throw HyracksDataException.create(e);
+            }
+            for (int iter1 = 0; iter1 < items.size(); iter1++) {
+                IVisitablePointable itemPointable = items.get(iter1);
+                // First, try to get defined type.
+                IAType fieldType = ((AbstractCollectionType) 
arg.getType()).getItemType();
+                if (fieldType.getTypeTag() == ATypeTag.ANY) {
+                    // Second, if defined type is not available, try to infer 
it from data
+                    IVisitablePointable itemTagPointable = itemTags.get(iter1);
+                    ATypeTag itemTypeTag = 
EnumDeserializer.ATYPETAGDESERIALIZER
+                            
.deserialize(itemTagPointable.getByteArray()[itemTagPointable.getStartOffset()]);
+                    fieldType = TypeTagUtil.getBuiltinTypeByTag(itemTypeTag);
+                }
+                PyTypeInfo fieldTypeInfo = 
pointableVisitor.getTypeInfo(fieldType, out);
+                itemPointable.accept(pointableVisitor, fieldTypeInfo);
+            }
+            return null;
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackPointableVisitor.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackPointableVisitor.java
new file mode 100644
index 0000000..08be6ac
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/msgpack/MsgPackPointableVisitor.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.external.library.msgpack;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.external.library.PyTypeInfo;
+import org.apache.asterix.om.pointables.AFlatValuePointable;
+import org.apache.asterix.om.pointables.AListVisitablePointable;
+import org.apache.asterix.om.pointables.ARecordVisitablePointable;
+import org.apache.asterix.om.pointables.visitor.IVisitablePointableVisitor;
+import org.apache.asterix.om.types.IAType;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class MsgPackPointableVisitor implements 
IVisitablePointableVisitor<Void, PyTypeInfo> {
+
+    Map<DataOutput, Map<IAType, PyTypeInfo>> typeInfoMap = new HashMap<>();
+
+    public Void visit(AListVisitablePointable accessor, PyTypeInfo arg) throws 
HyracksDataException {
+        MsgPackAccessors.MsgPackListAccessor.access(accessor, arg, this);
+        return null;
+    }
+
+    @Override
+    public Void visit(ARecordVisitablePointable accessor, PyTypeInfo arg) 
throws HyracksDataException {
+        MsgPackAccessors.MsgPackRecordAccessor.access(accessor, arg, this);
+        return null;
+    }
+
+    @Override
+    public Void visit(AFlatValuePointable accessor, PyTypeInfo arg) throws 
HyracksDataException {
+        try {
+            
MsgPackAccessors.createFlatMsgPackAccessor(arg.getType().getTypeTag()).apply(accessor,
 arg.getDataOutput());
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+        return null;
+    }
+
+    public PyTypeInfo getTypeInfo(IAType type, DataOutput out) {
+        PyTypeInfo tInfo = null;
+        Map<IAType, PyTypeInfo> type2TypeInfo = typeInfoMap.get(out);
+        if (type2TypeInfo == null) {
+            type2TypeInfo = new HashMap<>();
+            tInfo = new PyTypeInfo(type, out);
+            type2TypeInfo.put(type, tInfo);
+            typeInfoMap.put(out, type2TypeInfo);
+        }
+        tInfo = tInfo == null ? type2TypeInfo.get(type) : tInfo;
+        if (tInfo == null) {
+            tInfo = new PyTypeInfo(type, out);
+            type2TypeInfo.put(type, tInfo);
+        }
+        return tInfo;
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
index 593bac6..741dad2 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/operators/ExternalAssignBatchRuntimeFactory.java
@@ -33,11 +33,14 @@ import java.util.List;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.ipc.PythonIPCProto;
 import org.apache.asterix.external.library.PythonLibraryEvaluator;
 import org.apache.asterix.external.library.PythonLibraryEvaluatorFactory;
 import org.apache.asterix.external.library.msgpack.MessageUnpackerToADM;
-import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.library.msgpack.MsgPackPointableVisitor;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.functions.IExternalFunctionDescriptor;
+import org.apache.asterix.om.pointables.PointableAllocator;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 import org.apache.hyracks.algebricks.core.algebra.base.Counter;
@@ -48,6 +51,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.comm.io.ArrayTupleBuilder;
 import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference;
 import org.msgpack.core.MessagePack;
@@ -58,12 +62,10 @@ import org.msgpack.core.buffer.ArrayBufferInput;
 public final class ExternalAssignBatchRuntimeFactory extends 
AbstractOneInputOneOutputRuntimeFactory {
 
     private static final long serialVersionUID = 1L;
-    private int[] outColumns;
+    private final int[] outColumns;
     private final IExternalFunctionDescriptor[] fnDescs;
     private final int[][] fnArgColumns;
 
-    private int rpcBufferSize;
-
     public ExternalAssignBatchRuntimeFactory(int[] outColumns, 
IExternalFunctionDescriptor[] fnDescs,
             int[][] fnArgColumns, int[] projectionList) {
         super(projectionList);
@@ -76,17 +78,14 @@ public final class ExternalAssignBatchRuntimeFactory 
extends AbstractOneInputOne
     public AbstractOneInputOneOutputPushRuntime 
createOneOutputPushRuntime(IHyracksTaskContext ctx) {
 
         final int[] projectionToOutColumns = new int[projectionList.length];
-        //this is a temporary bodge. these buffers need to work like vsize 
frames, or be absent entirely
-        int maxArgSz = ExternalDataUtils.getArgBufferSize();
-        rpcBufferSize = ExternalDataUtils.roundUpToNearestFrameSize(maxArgSz, 
ctx.getInitialFrameSize());
         for (int j = 0; j < projectionList.length; j++) {
             projectionToOutColumns[j] = Arrays.binarySearch(outColumns, 
projectionList[j]);
         }
 
         return new AbstractOneInputOneOutputOneFramePushRuntime() {
 
-            private ByteBuffer outputWrapper;
-            private List<ByteBuffer> argHolders;
+            private ArrayBackedValueStorage outputWrapper;
+            private List<ArrayBackedValueStorage> argHolders;
             ArrayTupleBuilder tupleBuilder;
             private List<Pair<Long, PythonLibraryEvaluator>> libraryEvaluators;
             private ATypeTag[][] nullCalls;
@@ -95,6 +94,9 @@ public final class ExternalAssignBatchRuntimeFactory extends 
AbstractOneInputOne
             private MessageUnpacker unpacker;
             private ArrayBufferInput unpackerInput;
             private List<Pair<ByteBuffer, Counter>> batchResults;
+            private MessageUnpackerToADM unpackerToADM;
+            private PointableAllocator pointableAllocator;
+            private MsgPackPointableVisitor pointableVisitor;
 
             @Override
             public void open() throws HyracksDataException {
@@ -116,23 +118,26 @@ public final class ExternalAssignBatchRuntimeFactory 
extends AbstractOneInputOne
                 }
                 argHolders = new ArrayList<>(fnArgColumns.length);
                 for (int i = 0; i < fnArgColumns.length; i++) {
-                    argHolders.add(ctx.allocateFrame(rpcBufferSize));
+                    argHolders.add(new ArrayBackedValueStorage());
                 }
-                outputWrapper = ctx.allocateFrame();
+                outputWrapper = new ArrayBackedValueStorage();
                 nullCalls = new ATypeTag[argHolders.size()][0];
                 numCalls = new int[fnArgColumns.length];
                 batchResults = new ArrayList<>(argHolders.size());
                 for (int i = 0; i < argHolders.size(); i++) {
-                    batchResults.add(new 
Pair<>(ctx.allocateFrame(rpcBufferSize), new Counter(-1)));
+                    batchResults.add(new 
Pair<>(ByteBuffer.allocate(ExternalDataConstants.DEFAULT_BUFFER_SIZE),
+                            new Counter(-1)));
                 }
                 unpackerInput = new ArrayBufferInput(new byte[0]);
                 unpacker = MessagePack.newDefaultUnpacker(unpackerInput);
+                unpackerToADM = new MessageUnpackerToADM();
+                pointableAllocator = new PointableAllocator();
+                pointableVisitor = new MsgPackPointableVisitor();
             }
 
             private void resetBuffers(int numTuples, int[] numCalls) {
                 for (int func = 0; func < fnArgColumns.length; func++) {
-                    argHolders.get(func).clear();
-                    argHolders.get(func).position(0);
+                    argHolders.get(func).reset();
                     if (nullCalls[func].length < numTuples) {
                         nullCalls[func] = new ATypeTag[numTuples];
                     }
@@ -188,6 +193,8 @@ public final class ExternalAssignBatchRuntimeFactory 
extends AbstractOneInputOne
 
             @Override
             public void nextFrame(ByteBuffer buffer) throws 
HyracksDataException {
+                /*TODO: this whole transposition stuff is not necessary
+                        the evaulator should accept a format that is a 
collection of rows, logically*/
                 tAccess.reset(buffer);
                 tupleBuilder.reset();
                 try {
@@ -211,14 +218,15 @@ public final class ExternalAssignBatchRuntimeFactory 
extends AbstractOneInputOne
                             }
                             if (argumentStatus == ATypeTag.TYPE) {
                                 if (cols.length > 0) {
-                                    argHolders.get(func).put(ARRAY16);
-                                    argHolders.get(func).putShort((short) 
cols.length);
+                                    
argHolders.get(func).getDataOutput().writeByte(ARRAY16);
+                                    
argHolders.get(func).getDataOutput().writeShort((short) cols.length);
                                 }
                                 for (int colIdx = 0; colIdx < cols.length; 
colIdx++) {
                                     ref.set(buffer.array(), 
tRef.getFieldStart(cols[colIdx]),
                                             tRef.getFieldLength(cols[colIdx]));
-                                    
PythonLibraryEvaluator.setArgument(fnDescs[func].getArgumentTypes()[colIdx], 
ref,
-                                            argHolders.get(func), 
fnDescs[func].getFunctionInfo().getNullCall());
+                                    
PythonIPCProto.visitValueRef(fnDescs[func].getArgumentTypes()[colIdx],
+                                            
argHolders.get(func).getDataOutput(), ref, pointableAllocator,
+                                            pointableVisitor, 
fnDescs[func].getFunctionInfo().getNullCall());
                                 }
                             } else {
                                 numCalls[func]--;
@@ -228,6 +236,7 @@ public final class ExternalAssignBatchRuntimeFactory 
extends AbstractOneInputOne
                             }
                         }
                     }
+
                     //TODO: maybe this could be done in parallel for each 
unique library evaluator?
                     for (int argHolderIdx = 0; argHolderIdx < 
argHolders.size(); argHolderIdx++) {
                         Pair<Long, PythonLibraryEvaluator> fnEval = 
libraryEvaluators.get(argHolderIdx);
@@ -236,15 +245,14 @@ public final class ExternalAssignBatchRuntimeFactory 
extends AbstractOneInputOne
                         if (columnResult != null) {
                             Pair<ByteBuffer, Counter> resultholder = 
batchResults.get(argHolderIdx);
                             if (resultholder.getFirst().capacity() < 
columnResult.capacity()) {
-                                
resultholder.setFirst(ctx.allocateFrame(ExternalDataUtils.roundUpToNearestFrameSize(
-                                        columnResult.capacity(), 
ctx.getInitialFrameSize())));
+                                ByteBuffer realloc = 
ctx.reallocateFrame(resultholder.getFirst(),
+                                        columnResult.capacity() * 2, false);
+                                resultholder.setFirst(realloc);
                             }
                             ByteBuffer resultBuf = resultholder.getFirst();
-                            resultBuf.clear();
-                            resultBuf.position(0);
                             //offset 1 to skip message type
-                            System.arraycopy(columnResult.array(), 
columnResult.arrayOffset() + 1, resultBuf.array(),
-                                    resultBuf.arrayOffset(), 
columnResult.capacity() - 1);
+                            System.arraycopy(columnResult.array(), 1, 
resultBuf.array(), 0,
+                                    columnResult.remaining() - 1);
                             //wrapper for results and warnings arrays. always 
length 2
                             consumeAndGetBatchLength(resultBuf);
                             int numResults = (int) 
consumeAndGetBatchLength(resultBuf);
@@ -260,37 +268,30 @@ public final class ExternalAssignBatchRuntimeFactory 
extends AbstractOneInputOne
                             }
                         }
                     }
+
                     //decompose returned function columns into frame tuple 
format
                     for (int i = 0; i < numTuples; i++) {
                         tupleBuilder.reset();
                         for (int f = 0; f < projectionList.length; f++) {
                             int k = projectionToOutColumns[f];
                             if (k >= 0) {
-                                outputWrapper.clear();
-                                outputWrapper.position(0);
+                                outputWrapper.reset();
                                 Pair<ByteBuffer, Counter> result = 
batchResults.get(k);
-                                if (result.getFirst() != null) {
-                                    if (result.getFirst().capacity() > 
outputWrapper.capacity()) {
-                                        outputWrapper = 
ctx.allocateFrame(ExternalDataUtils.roundUpToNearestFrameSize(
-                                                outputWrapper.capacity(), 
ctx.getInitialFrameSize()));
-                                    }
-                                }
-                                int start = outputWrapper.arrayOffset();
                                 ATypeTag functionCalled = nullCalls[k][i];
                                 if (functionCalled == ATypeTag.TYPE) {
                                     if (result.getSecond().get() > 0) {
-                                        
MessageUnpackerToADM.unpack(result.getFirst(), outputWrapper, true);
+                                        
unpackerToADM.unpack(result.getFirst(), outputWrapper.getDataOutput(), true);
                                         
result.getSecond().set(result.getSecond().get() - 1);
                                     } else {
                                         //emit NULL for functions which failed 
with a warning
-                                        
outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                                        
outputWrapper.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
                                     }
                                 } else if (functionCalled == ATypeTag.NULL) {
-                                    
outputWrapper.put(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
+                                    
outputWrapper.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
                                 } else {
-                                    
outputWrapper.put(ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
+                                    
outputWrapper.getDataOutput().writeByte(ATypeTag.SERIALIZED_MISSING_TYPE_TAG);
                                 }
-                                tupleBuilder.addField(outputWrapper.array(), 
start, start + outputWrapper.position());
+                                
tupleBuilder.addField(outputWrapper.getByteArray(), 0, 
outputWrapper.getLength());
                             } else {
                                 tupleBuilder.addField(tAccess, i, 
projectionList[f]);
                             }
diff --git 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/PrintTools.java
 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/PrintTools.java
index b337777..449ce13 100644
--- 
a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/PrintTools.java
+++ 
b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/PrintTools.java
@@ -18,6 +18,7 @@
  */
 package org.apache.asterix.dataflow.data.nontagged.printers;
 
+import java.io.DataOutput;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -285,12 +286,39 @@ public class PrintTools {
             if (c == '"') {
                 os.write('"');
             }
-            os.write(c);
-            position += sz;
+            if (Character.isHighSurrogate(c)) {
+                position += writeSupplementaryChar(os, b, maxPosition, 
position, c, sz);
+                continue;
+            }
+            while (sz > 0) {
+                os.write(b[position]);
+                ++position;
+                --sz;
+            }
+            break;
         }
         os.write('"');
     }
 
+    public static void writeUTF8StringRaw(byte[] b, int s, int l, DataOutput 
os) throws IOException {
+        int utfLength = UTF8StringUtil.getUTFLength(b, s);
+        int position = s + UTF8StringUtil.getNumBytesToStoreLength(utfLength); 
// skip 2 bytes containing string size
+        int maxPosition = position + utfLength;
+        while (position < maxPosition) {
+            char c = UTF8StringUtil.charAt(b, position);
+            int sz = UTF8StringUtil.charSize(b, position);
+            if (Character.isHighSurrogate(c)) {
+                position += writeSupplementaryChar(os, b, maxPosition, 
position, c, sz);
+                continue;
+            }
+            while (sz > 0) {
+                os.write(b[position]);
+                ++position;
+                --sz;
+            }
+        }
+    }
+
     public static void writeUTF8StringAsJSON(byte[] b, int s, int l, 
OutputStream os) throws IOException {
         int utfLength = UTF8StringUtil.getUTFLength(b, s);
         os.write('"');
@@ -420,4 +448,17 @@ public class PrintTools {
         return highSurrogateSize + lowSurrogateSize;
     }
 
+    //TODO: some way to dedupe this?
+    private static int writeSupplementaryChar(DataOutput os, byte[] src, int 
limit, int highSurrogatePos,
+            char highSurrogate, int highSurrogateSize) throws IOException {
+        final int lowSurrogatePos = highSurrogatePos + highSurrogateSize;
+        if (lowSurrogatePos >= limit) {
+            throw new IllegalStateException("malformed utf8 input");
+        }
+        final char lowSurrogate = UTF8StringUtil.charAt(src, lowSurrogatePos);
+        final int lowSurrogateSize = UTF8StringUtil.charSize(src, 
lowSurrogatePos);
+        os.write(new String(new char[] { highSurrogate, lowSurrogate 
}).getBytes(StandardCharsets.UTF_8));
+        return highSurrogateSize + lowSurrogateSize;
+    }
+
 }

Reply via email to