QPID-7932: [Java Broker, AMQP 1.0] Improve error handling when deserializing 
composite types

The constructors are now auto-generated with generation being driven from 
annotations.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/6a267175
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/6a267175
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/6a267175

Branch: refs/heads/master
Commit: 6a2671755d49f849e69d1392eeaef2c1a9783713
Parents: 85459e5
Author: Lorenz Quack <lqu...@apache.org>
Authored: Mon Sep 25 10:36:14 2017 +0100
Committer: Lorenz Quack <lqu...@apache.org>
Committed: Thu Sep 28 14:30:17 2017 +0100

----------------------------------------------------------------------
 .../server/protocol/v1_0/CompositeType.java     |  34 ++
 .../v1_0/CompositeTypeConstructorGenerator.java | 443 ++++++++++++++++++
 .../protocol/v1_0/CompositeTypeField.java       |  35 ++
 .../javax.annotation.processing.Processor       |   1 +
 .../protocol/v1_0/DeserializationFactories.java | 345 ++++++++++++++
 .../qpid/server/protocol/v1_0/Session_1_0.java  |  86 +---
 .../codec/AbstractCompositeTypeConstructor.java | 297 ++++++++++++
 .../codec/AbstractDescribedTypeConstructor.java |  68 +--
 .../v1_0/codec/CompoundTypeConstructor.java     | 193 --------
 .../protocol/v1_0/codec/ListConstructor.java    |  95 ++++
 .../protocol/v1_0/codec/MapConstructor.java     | 144 ++++++
 .../protocol/v1_0/codec/ValueHandler.java       |   6 +-
 .../protocol/v1_0/framing/FrameHandler.java     |   5 +
 .../protocol/v1_0/type/CompositeTypeField.java  |  33 --
 .../type/codec/AMQPDescribedTypeRegistry.java   |   2 +-
 .../v1_0/type/messaging/AbstractSection.java    |   6 +-
 .../protocol/v1_0/type/messaging/Accepted.java  |   2 +
 .../type/messaging/AmqpSequenceSection.java     |   4 +-
 .../v1_0/type/messaging/AmqpValueSection.java   |   4 +-
 .../messaging/ApplicationPropertiesSection.java |   4 +-
 .../v1_0/type/messaging/DataSection.java        |   4 +-
 .../v1_0/type/messaging/DeleteOnClose.java      |   2 +
 .../v1_0/type/messaging/DeleteOnNoLinks.java    |   2 +
 .../messaging/DeleteOnNoLinksOrMessages.java    |   2 +
 .../v1_0/type/messaging/DeleteOnNoMessages.java |   2 +
 .../messaging/DeliveryAnnotationsSection.java   |   4 +-
 .../v1_0/type/messaging/FooterSection.java      |   4 +-
 .../protocol/v1_0/type/messaging/Header.java    |  14 +-
 .../v1_0/type/messaging/HeaderSection.java      |   4 +-
 .../messaging/MessageAnnotationsSection.java    |   4 +-
 .../protocol/v1_0/type/messaging/Modified.java  |  16 +-
 .../v1_0/type/messaging/Properties.java         |  30 +-
 .../v1_0/type/messaging/PropertiesSection.java  |   4 +-
 .../protocol/v1_0/type/messaging/Received.java  |   8 +-
 .../protocol/v1_0/type/messaging/Rejected.java  |   6 +-
 .../protocol/v1_0/type/messaging/Released.java  |   2 +
 .../protocol/v1_0/type/messaging/Source.java    |  45 +-
 .../v1_0/type/messaging/StdDistMode.java        |  42 +-
 .../protocol/v1_0/type/messaging/Target.java    |  66 ++-
 .../v1_0/type/messaging/TerminusDurability.java |  48 +-
 .../type/messaging/TerminusExpiryPolicy.java    |  64 ++-
 .../messaging/codec/AcceptedConstructor.java    |  72 ---
 .../codec/DeleteOnCloseConstructor.java         |  72 ---
 .../codec/DeleteOnNoLinksConstructor.java       |  72 ---
 .../DeleteOnNoLinksOrMessagesConstructor.java   |  72 ---
 .../codec/DeleteOnNoMessagesConstructor.java    |  72 ---
 .../type/messaging/codec/HeaderConstructor.java | 209 ---------
 .../messaging/codec/ModifiedConstructor.java    | 153 ------
 .../messaging/codec/PropertiesConstructor.java  | 426 -----------------
 .../messaging/codec/ReceivedConstructor.java    | 126 -----
 .../messaging/codec/RejectedConstructor.java    |  98 ----
 .../messaging/codec/ReleasedConstructor.java    |  71 ---
 .../type/messaging/codec/SourceConstructor.java | 385 ---------------
 .../type/messaging/codec/TargetConstructor.java | 270 -----------
 .../v1_0/type/security/SaslChallenge.java       |  27 +-
 .../protocol/v1_0/type/security/SaslCode.java   |  70 ++-
 .../protocol/v1_0/type/security/SaslInit.java   |  37 +-
 .../v1_0/type/security/SaslMechanisms.java      |  22 +-
 .../v1_0/type/security/SaslOutcome.java         |  33 +-
 .../v1_0/type/security/SaslResponse.java        |  27 +-
 .../codec/SaslChallengeConstructor.java         |  99 ----
 .../security/codec/SaslInitConstructor.java     | 153 ------
 .../codec/SaslMechanismsConstructor.java        | 106 -----
 .../security/codec/SaslOutcomeConstructor.java  | 126 -----
 .../security/codec/SaslResponseConstructor.java |  99 ----
 .../v1_0/type/transaction/Coordinator.java      |  24 +-
 .../protocol/v1_0/type/transaction/Declare.java |  19 +-
 .../v1_0/type/transaction/Declared.java         |   6 +-
 .../v1_0/type/transaction/Discharge.java        |  26 +-
 .../type/transaction/TransactionErrors.java     |  62 +--
 .../type/transaction/TransactionalState.java    |   8 +-
 .../v1_0/type/transaction/TxnCapabilities.java  | 146 ------
 .../v1_0/type/transaction/TxnCapability.java    |  75 ++-
 .../codec/CoordinatorConstructor.java           | 117 -----
 .../transaction/codec/DeclareConstructor.java   | 100 ----
 .../transaction/codec/DeclaredConstructor.java  |  99 ----
 .../transaction/codec/DischargeConstructor.java | 126 -----
 .../codec/TransactionalStateConstructor.java    | 126 -----
 .../protocol/v1_0/type/transport/AmqpError.java |   1 -
 .../protocol/v1_0/type/transport/Attach.java    |  32 +-
 .../protocol/v1_0/type/transport/Begin.java     |  20 +-
 .../protocol/v1_0/type/transport/Close.java     |   6 +-
 .../v1_0/type/transport/ConnectionError.java    |   1 -
 .../protocol/v1_0/type/transport/Detach.java    |  10 +-
 .../v1_0/type/transport/Disposition.java        |  16 +-
 .../protocol/v1_0/type/transport/End.java       |   6 +-
 .../protocol/v1_0/type/transport/Error.java     |  18 +-
 .../protocol/v1_0/type/transport/Flow.java      |  26 +-
 .../protocol/v1_0/type/transport/LinkError.java |   1 -
 .../protocol/v1_0/type/transport/Open.java      |  24 +-
 .../v1_0/type/transport/SessionError.java       |   1 -
 .../protocol/v1_0/type/transport/Transfer.java  |  26 +-
 .../type/transport/codec/AttachConstructor.java | 465 -------------------
 .../type/transport/codec/BeginConstructor.java  | 303 ------------
 .../type/transport/codec/CloseConstructor.java  |  99 ----
 .../type/transport/codec/DetachConstructor.java | 153 ------
 .../transport/codec/DispositionConstructor.java | 234 ----------
 .../type/transport/codec/EndConstructor.java    |  99 ----
 .../type/transport/codec/ErrorConstructor.java  | 224 ---------
 .../type/transport/codec/FlowConstructor.java   | 370 ---------------
 .../type/transport/codec/OpenConstructor.java   | 373 ---------------
 .../transport/codec/TransferConstructor.java    | 369 ---------------
 .../protocol/v1_0/store/LinkStoreTestCase.java  |   2 +-
 .../tests/protocol/v1_0/DecodeErrorTest.java    | 196 ++++++++
 .../protocol/v1_0/messaging/MessageFormat.java  |  49 --
 105 files changed, 2115 insertions(+), 7020 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeType.java
----------------------------------------------------------------------
diff --git 
a/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeType.java
 
b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeType.java
new file mode 100644
index 0000000..41281c7
--- /dev/null
+++ 
b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface CompositeType
+{
+    String symbolicDescriptor();
+    long numericDescriptor();
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
----------------------------------------------------------------------
diff --git 
a/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
 
b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
new file mode 100644
index 0000000..47308c8
--- /dev/null
+++ 
b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Consumer;
+
+import javax.annotation.processing.AbstractProcessor;
+import javax.annotation.processing.Filer;
+import javax.annotation.processing.ProcessingEnvironment;
+import javax.annotation.processing.RoundEnvironment;
+import javax.lang.model.SourceVersion;
+import javax.lang.model.element.AnnotationMirror;
+import javax.lang.model.element.AnnotationValue;
+import javax.lang.model.element.Element;
+import javax.lang.model.element.ElementKind;
+import javax.lang.model.element.ExecutableElement;
+import javax.lang.model.element.PackageElement;
+import javax.lang.model.element.TypeElement;
+import javax.lang.model.element.VariableElement;
+import javax.lang.model.type.ArrayType;
+import javax.lang.model.type.DeclaredType;
+import javax.lang.model.type.TypeKind;
+import javax.lang.model.type.TypeMirror;
+import javax.lang.model.util.Elements;
+import javax.lang.model.util.Types;
+import javax.tools.Diagnostic;
+import javax.tools.JavaFileObject;
+
+import org.apache.qpid.server.License;
+
+
+public class CompositeTypeConstructorGenerator  extends AbstractProcessor
+{
+    private static final List<String> RESTRICTED_TYPES = Arrays.asList(
+            "org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError",
+            
"org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError",
+            "org.apache.qpid.server.protocol.v1_0.type.transport.SessionError",
+            "org.apache.qpid.server.protocol.v1_0.type.transport.LinkError",
+            
"org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionErrors",
+            
"org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode",
+            
"org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode",
+            "org.apache.qpid.server.protocol.v1_0.type.transport.Role",
+            "org.apache.qpid.server.protocol.v1_0.type.security.SaslCode",
+            
"org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionDetectionPolicy",
+            
"org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy",
+            "org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode",
+            
"org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability",
+            
"org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy",
+            
"org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability");
+
+
+    @Override
+    public SourceVersion getSupportedSourceVersion()
+    {
+        return SourceVersion.latest();
+    }
+
+    @Override
+    public Set<String> getSupportedAnnotationTypes()
+    {
+        return Collections.singleton(CompositeType.class.getName());
+    }
+
+    @Override
+    public boolean process(final Set<? extends TypeElement> annotations, final 
RoundEnvironment roundEnvironment)
+    {
+        if(roundEnvironment.processingOver())
+        {
+            return true;
+        }
+
+        Filer filer = processingEnv.getFiler();
+        try
+        {
+            for (Element e : 
roundEnvironment.getElementsAnnotatedWith(CompositeType.class))
+            {
+                if(e.getKind() == ElementKind.CLASS)
+                {
+                    generateCompositeTypeConstructor(filer, (TypeElement) e);
+                }
+            }
+        }
+        catch (Exception e)
+        {
+            try(StringWriter sw = new StringWriter();
+                PrintWriter pw = new PrintWriter(sw))
+            {
+                e.printStackTrace(pw);
+                
processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, "Unexpected 
Error: " + sw.toString());
+            }
+            catch (IOException ioe)
+            {
+                
processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, "Error: " + 
ioe.getLocalizedMessage());
+            }
+        }
+        return true;
+    }
+
+
+    private void generateCompositeTypeConstructor(final Filer filer, final 
TypeElement typeElement)
+    {
+        String objectQualifiedClassName = 
typeElement.getQualifiedName().toString();
+        String objectSimpleName = typeElement.getSimpleName().toString();
+        String compositeTypeConstructorNameSimpleName = objectSimpleName + 
"Constructor";
+        PackageElement packageElement = (PackageElement) 
typeElement.getEnclosingElement();
+        final String compositeTypeConstructorPackage = 
packageElement.getQualifiedName() + ".codec";
+        String compositeTypeConstructorName = compositeTypeConstructorPackage 
+ "." + compositeTypeConstructorNameSimpleName;
+        final CompositeType annotation = 
typeElement.getAnnotation(CompositeType.class);
+
+        processingEnv.getMessager().printMessage(Diagnostic.Kind.NOTE, 
"Generating composite constructor file for " + objectQualifiedClassName);
+
+        try
+        {
+            JavaFileObject factoryFile = 
filer.createSourceFile(compositeTypeConstructorName);
+            PrintWriter pw = new PrintWriter(new 
OutputStreamWriter(factoryFile.openOutputStream(), "UTF-8"));
+            pw.println("/*");
+            for(String headerLine : License.LICENSE)
+            {
+                pw.println(" *" + headerLine);
+            }
+            pw.println(" */");
+            pw.println();
+            pw.print("package ");
+            pw.print(compositeTypeConstructorPackage);
+            pw.println(";");
+            pw.println();
+
+            pw.println("import java.util.List;");
+            pw.println();
+            pw.println("import 
org.apache.qpid.server.protocol.v1_0.codec.AbstractCompositeTypeConstructor;");
+            pw.println("import 
org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;");
+            pw.println("import 
org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;");
+            pw.println("import 
org.apache.qpid.server.protocol.v1_0.type.Symbol;");
+            pw.println("import 
org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;");
+            pw.println("import 
org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;");
+            pw.println("import 
org.apache.qpid.server.protocol.v1_0.type.transport.Error;");
+            pw.println("import " + objectQualifiedClassName + ";");
+            pw.println();
+
+            pw.println("public final class " + 
compositeTypeConstructorNameSimpleName + " extends 
AbstractCompositeTypeConstructor<"+ objectSimpleName +">");
+            pw.println("{");
+            pw.println("    private static final " + 
compositeTypeConstructorNameSimpleName + " INSTANCE = new " + 
compositeTypeConstructorNameSimpleName + "();");
+            pw.println();
+
+            pw.println("    public static void 
register(DescribedTypeConstructorRegistry registry)");
+            pw.println("    {");
+            pw.println("        registry.register(Symbol.valueOf(\"" + 
annotation.symbolicDescriptor() + "\"), INSTANCE);");
+            pw.println(String.format("        
registry.register(UnsignedLong.valueOf(%#016x), INSTANCE);", 
annotation.numericDescriptor()));
+            pw.println("    }");
+            pw.println();
+
+            pw.println("    @Override");
+            pw.println("    protected String getTypeName()");
+            pw.println("    {");
+            pw.println("        return " + objectSimpleName + 
".class.getSimpleName();");
+            pw.println("    }");
+            pw.println();
+
+            pw.println("    @Override");
+            pw.println("    protected " + objectSimpleName + " construct(final 
FieldValueReader fieldValueReader) throws AmqpErrorException");
+            pw.println("    {");
+            pw.println("        " + objectSimpleName + " obj = new " + 
objectSimpleName + "();");
+            pw.println();
+            generateAssigners(pw, typeElement);
+            pw.println("        return obj;");
+            pw.println("    }");
+            pw.println("}");
+            pw.close();
+        }
+        catch (IOException e)
+        {
+            processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR,
+                                                     "Failed to write 
composite constructor file: "
+                                                     + 
compositeTypeConstructorName
+                                                     + " - "
+                                                     + 
e.getLocalizedMessage());
+        }
+    }
+
+    private void generateAssigners(final PrintWriter pw, final TypeElement 
typeElement)
+    {
+        Types typeUtils = processingEnv.getTypeUtils();
+
+        final List<AnnotatedField> annotatedFields = new ArrayList<>();
+        for (Element element : typeElement.getEnclosedElements())
+        {
+            if (element instanceof VariableElement && element.getKind() == 
ElementKind.FIELD)
+            {
+                boolean annotationFound = false;
+                for(AnnotationMirror annotationMirror : 
element.getAnnotationMirrors())
+                {
+                    
if(annotationMirror.getAnnotationType().toString().equals("org.apache.qpid.server.protocol.v1_0.CompositeTypeField"))
+                    {
+                        if (annotationFound)
+                        {
+                            processingEnv.getMessager()
+                                         .printMessage(Diagnostic.Kind.ERROR,
+                                                       String.format(
+                                                               "More than one 
CompositeTypeField annotations on field '%s.%s'",
+                                                               
typeElement.getSimpleName(),
+                                                               
element.getSimpleName()));
+                        }
+                        annotationFound = true;
+                        annotatedFields.add(new 
AnnotatedField((VariableElement) element, annotationMirror));
+                    }
+                }
+            }
+        }
+
+        
annotatedFields.sort(Comparator.comparingInt(AnnotatedField::getIndex));
+
+        for (int index = 0; index < annotatedFields.size(); ++index)
+        {
+            AnnotatedField annotatedField = annotatedFields.get(index);
+            final VariableElement variableElement = 
annotatedField.getVariableElement();
+            final String fieldName = 
stripUnderscore(variableElement.getSimpleName().toString());
+            if (annotatedField.getIndex() != index)
+            {
+                
processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR,String.format(
+                        "Unexpected CompositeTypeField index '%d' is specified 
on field '%s' of '%s'. Expected %d.",
+                        annotatedField.getIndex(),
+                        fieldName,
+                        typeElement.getSimpleName(),
+                        index));
+            }
+
+            final String baseIndent = "        ";
+            if (variableElement.asType().getKind() == TypeKind.ARRAY)
+            {
+                final TypeMirror componentType = ((ArrayType) 
variableElement.asType()).getComponentType();
+                final String functionString;
+                if (annotatedField.getFactory() != null)
+                {
+                    functionString = "x -> " + annotatedField.getFactory() + 
"(x)";
+                }
+                else if (RESTRICTED_TYPES.contains(componentType))
+                {
+                    functionString = variableElement.asType().toString() + 
"::valueOf";
+                }
+                else
+                {
+                    functionString = "x -> (" + componentType + ") x";
+                }
+                pw.println(String.format("        %s %s = 
fieldValueReader.readArrayValue(%d, \"%s\", %s, %s.class, %s);",
+                                             
annotatedField.getVariableElement().asType(),
+                                             fieldName,
+                                             index,
+                                             fieldName,
+                                             annotatedField.isMandatory(),
+                                             componentType,
+                                             functionString));
+                optionallyWrapInNullCheck(!annotatedField.isMandatory(), pw, 
baseIndent, fieldName, indent -> {
+                    pw.println(indent + "obj." + 
getSetterName(variableElement) + "(" + fieldName + ");");
+                });
+            }
+            else if (annotatedField.getFactory() != null || 
RESTRICTED_TYPES.contains(variableElement.asType().toString()))
+            {
+                String functionName = annotatedField.getFactory() != null ? 
annotatedField.getFactory() : variableElement.asType().toString() + ".valueOf";
+                pw.println(String.format("        Object %s = 
fieldValueReader.readValue(%d, \"%s\", %s, Object.class);",
+                                         fieldName,
+                                         index,
+                                         fieldName,
+                                         annotatedField.isMandatory()));
+                optionallyWrapInNullCheck(!annotatedField.isMandatory(), pw, 
baseIndent, fieldName, indent -> {
+                    pw.println(indent + "try");
+                    pw.println(indent + "{");
+                    pw.println(indent + "    obj." + 
getSetterName(variableElement) + "(" + functionName + "(" + fieldName + "));");
+                    pw.println(indent + "}");
+                    pw.println(indent + "catch (RuntimeException e)");
+                    pw.println(indent + "{");
+                    pw.println(indent + "    Error error = new 
Error(AmqpError.DECODE_ERROR, \"Could not decode value field '" + fieldName + 
"' of '" + typeElement.getSimpleName() + "'\");");
+                    pw.println(indent + "    throw new 
AmqpErrorException(error, e);");
+                    pw.println(indent + "}");
+                });
+            }
+            else if 
(typeUtils.isSameType(typeUtils.erasure(variableElement.asType()),
+                                          getErasure(processingEnv, 
"java.util.Map")))
+            {
+                List<? extends TypeMirror> args = ((DeclaredType) 
variableElement.asType()).getTypeArguments();
+                if (args.size() != 2)
+                {
+                    
processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR,
+                                                             "Map types must 
have exactly two type arguments");
+                }
+                pw.println(String.format("        %s %s = 
fieldValueReader.readMapValue(%d, \"%s\", %s, %s.class, %s.class);",
+                                         
annotatedField.getVariableElement().asType(),
+                                         fieldName,
+                                         index,
+                                         fieldName,
+                                         annotatedField.isMandatory(),
+                                         args.get(0),
+                                         args.get(1)));
+                optionallyWrapInNullCheck(!annotatedField.isMandatory(), pw, 
baseIndent, fieldName, indent -> {
+                    pw.println(indent + "obj." + 
getSetterName(variableElement) + "(" + fieldName + ");");
+                });
+            }
+            else
+            {
+                pw.println(String.format("        %s %s = 
fieldValueReader.readValue(%d, \"%s\", %s, %s.class);",
+                                         
annotatedField.getVariableElement().asType(),
+                                         fieldName,
+                                         index,
+                                         fieldName,
+                                         annotatedField.isMandatory(),
+                                         
annotatedField.getVariableElement().asType()));
+                optionallyWrapInNullCheck(!annotatedField.isMandatory(), pw, 
baseIndent, fieldName, indent -> {
+                    pw.println(indent + "obj." + 
getSetterName(variableElement) + "(" + fieldName + ");");
+                });
+            }
+
+            pw.println();
+        }
+    }
+
+    private void optionallyWrapInNullCheck(boolean wrap, PrintWriter pw, 
String indent, String fieldName, Consumer<String> f)
+    {
+        if (wrap)
+        {
+            pw.println(indent + "if (" + fieldName + " != null)");
+            pw.println(indent + "{");
+            indent += "    ";
+        }
+        f.accept(indent);
+        if (wrap)
+        {
+            indent = indent.substring(4);
+            pw.println(indent + "}");
+        }
+    }
+
+    private String getSetterName(final VariableElement variableElement)
+    {
+        final String fieldName = 
stripUnderscore(variableElement.getSimpleName().toString());
+        return "set" + fieldName.substring(0, 1).toUpperCase() + 
fieldName.substring(1);
+    }
+
+    private String stripUnderscore(final String fieldName)
+    {
+        if (fieldName.startsWith("_"))
+        {
+            return fieldName.substring(1);
+        }
+        return fieldName;
+    }
+
+    private static class AnnotatedField
+    {
+        private final VariableElement _variableElement;
+        private final AnnotationMirror _annotationMirror;
+        private final int _index;
+        private final String _factory;
+        private final boolean _mandatory;
+
+        public AnnotatedField(final VariableElement variableElement, final 
AnnotationMirror annotationMirror)
+        {
+            _variableElement = variableElement;
+            _annotationMirror = annotationMirror;
+            String factory = null;
+            boolean mandatory = false;
+            int index = -1;
+            for (Map.Entry<? extends ExecutableElement,? extends 
AnnotationValue> entry : annotationMirror.getElementValues().entrySet())
+            {
+                if ("index".contentEquals(entry.getKey().getSimpleName()))
+                {
+                    index = (int) entry.getValue().getValue();
+                }
+                else if 
("deserializationConverter".contentEquals(entry.getKey().getSimpleName()))
+                {
+                    factory = (String) entry.getValue().getValue();
+                }
+                else if 
("mandatory".contentEquals(entry.getKey().getSimpleName()))
+                {
+                    mandatory = (boolean) entry.getValue().getValue();
+                }
+            }
+            _index = index;
+            _mandatory = mandatory;
+            _factory = factory;
+        }
+
+        public VariableElement getVariableElement()
+        {
+            return _variableElement;
+        }
+
+        public AnnotationMirror getAnnotationMirror()
+        {
+            return _annotationMirror;
+        }
+
+        public int getIndex()
+        {
+            return _index;
+        }
+
+        public String getFactory()
+        {
+            return _factory;
+        }
+
+        public boolean isMandatory()
+        {
+            return _mandatory;
+        }
+    }
+
+    private static TypeMirror getErasure(ProcessingEnvironment processingEnv, 
final String className)
+    {
+        final Types typeUtils = processingEnv.getTypeUtils();
+        final Elements elementUtils = processingEnv.getElementUtils();
+        return 
typeUtils.erasure(elementUtils.getTypeElement(className).asType());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeField.java
----------------------------------------------------------------------
diff --git 
a/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeField.java
 
b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeField.java
new file mode 100644
index 0000000..be2893e
--- /dev/null
+++ 
b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeField.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.FIELD)
+public @interface CompositeTypeField
+{
+    int index();
+    boolean mandatory() default false;
+    String deserializationConverter() default "";
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-codegen/src/main/resources/META-INF/services/javax.annotation.processing.Processor
----------------------------------------------------------------------
diff --git 
a/broker-codegen/src/main/resources/META-INF/services/javax.annotation.processing.Processor
 
b/broker-codegen/src/main/resources/META-INF/services/javax.annotation.processing.Processor
index c8ef012..8e295cf 100644
--- 
a/broker-codegen/src/main/resources/META-INF/services/javax.annotation.processing.Processor
+++ 
b/broker-codegen/src/main/resources/META-INF/services/javax.annotation.processing.Processor
@@ -25,3 +25,4 @@ 
org.apache.qpid.server.model.validation.AttributeFieldValidation
 org.apache.qpid.server.model.validation.ManagedAnnotationValidator
 org.apache.qpid.server.model.validation.OperationAnnotationValidator
 org.apache.qpid.server.model.validation.ContentHeaderAnnotationValidator
+org.apache.qpid.server.protocol.v1_0.CompositeTypeConstructorGenerator

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java
new file mode 100644
index 0000000..f76edf4
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.lang.reflect.Array;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.DistributionMode;
+import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition;
+import org.apache.qpid.server.protocol.v1_0.type.LifetimePolicy;
+import org.apache.qpid.server.protocol.v1_0.type.Symbol;
+import org.apache.qpid.server.protocol.v1_0.type.TxnCapability;
+import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode;
+import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionErrors;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError;
+
+public class DeserializationFactories
+{
+    @SuppressWarnings("unused")
+    public static Map<Symbol, Object> convertToNodeProperties(final Object 
value) throws AmqpErrorException
+    {
+        if (value != null)
+        {
+            if (!(value instanceof Map))
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                             String.format("Cannot construct 
'node-properties' from type '%s'",
+                                                           
value.getClass().getSimpleName()));
+            }
+            Map<Symbol, Object> nodeProperties = new LinkedHashMap<>();
+            Map<?, ?> map = (Map<?, ?>) value;
+            for (Map.Entry<?,?> entry : map.entrySet())
+            {
+                Object key = entry.getKey();
+                if (!(key instanceof Symbol))
+                {
+                    throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                                 
String.format("'node-properties' must have only keys of type 'symbol' but got 
'%s'",
+                                                               
key.getClass().getSimpleName()));
+                }
+                if (Session_1_0.LIFETIME_POLICY.equals(key))
+                {
+                    final Object lifetimePolicy = entry.getValue();
+                    if (!(lifetimePolicy instanceof LifetimePolicy))
+                    {
+                        String typeName = lifetimePolicy == null ? null : 
lifetimePolicy.getClass().getSimpleName();
+                        throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                                     String.format("Cannot 
construct 'lifetime-policy' from type '%s'",
+                                                                   typeName));
+                    }
+                    nodeProperties.put((Symbol) key, lifetimePolicy);
+                }
+                else if (Symbol.valueOf("supported-dist-modes").equals(key))
+                {
+                    final Object distributionMode = entry.getValue();
+                    final DistributionMode[] converted;
+                    if (distributionMode == null)
+                    {
+                        converted = null;
+
+                    }
+                    else if (distributionMode.getClass().isArray())
+                    {
+                        converted = new 
DistributionMode[Array.getLength(distributionMode)];
+                        for (int i = 0; i < converted.length; ++i)
+                        {
+                            final Object item = Array.get(distributionMode, i);
+                            if (item == null)
+                            {
+                                throw new 
AmqpErrorException(AmqpError.DECODE_ERROR,
+                                                             "'null' not 
allowed in 'supported-distribution-modes'");
+                            }
+                            converted[i] = convertToDistributionMode(item);
+                        }
+                    }
+                    else
+                    {
+                        converted = new DistributionMode[] 
{convertToDistributionMode(distributionMode)};
+                    }
+                    nodeProperties.put((Symbol) key, converted);
+                }
+                else
+                {
+                    nodeProperties.put((Symbol) key, entry.getValue());
+                }
+            }
+            return nodeProperties;
+        }
+        else
+        {
+            return null;
+        }
+    }
+
+    @SuppressWarnings("unused")
+    public static DistributionMode convertToDistributionMode(final Object 
value) throws AmqpErrorException
+    {
+        DistributionMode distributionMode = null;
+        if (value != null)
+        {
+            if (value instanceof DistributionMode)
+            {
+                distributionMode = (DistributionMode) value;
+            }
+            else if (value instanceof Symbol)
+            {
+                distributionMode = StdDistMode.valueOf(value);
+                if (distributionMode == null)
+                {
+                    distributionMode = new UnknownDistributionMode((Symbol) 
value);
+                }
+            }
+            else
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                             String.format("Cannot construct 
'distribution-mode' from type '%s'",
+                                                           
value.getClass().getSimpleName()));
+            }
+        }
+        return distributionMode;
+    }
+
+    @SuppressWarnings("unused")
+    public static TxnCapability convertToTxnCapability(final Object value) 
throws AmqpErrorException
+    {
+        TxnCapability capability = null;
+        if (value != null)
+        {
+            if (value instanceof TxnCapability)
+            {
+                capability = (TxnCapability) value;
+            }
+            else if (value instanceof Symbol)
+            {
+                capability = 
org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability.valueOf(value);
+                if (capability == null)
+                {
+                    capability = new UnknownTxnCapability((Symbol) value);
+                }
+            }
+            else
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                             String.format("Cannot construct 
'txn-capability' from type '%s'",
+                                                           
value.getClass().getSimpleName()));
+            }
+        }
+        return capability;
+    }
+
+    @SuppressWarnings("unsued")
+    public static ErrorCondition convertToErrorCondition(final Object value) 
throws AmqpErrorException
+    {
+        ErrorCondition condition = null;
+        if (value != null)
+        {
+            if (value instanceof ErrorCondition)
+            {
+                condition = (ErrorCondition) value;
+            }
+            else if (value instanceof Symbol)
+            {
+                condition = AmqpError.valueOf(value);
+                if (condition == null)
+                {
+                    condition = ConnectionError.valueOf(value);
+                    if (condition == null)
+                    {
+                        condition = SessionError.valueOf(value);
+                        if (condition == null)
+                        {
+                            condition = LinkError.valueOf(value);
+                            if (condition == null)
+                            {
+                                condition = TransactionErrors.valueOf(value);
+                                if (condition == null)
+                                {
+                                    condition = new 
UnknownErrorCondition((Symbol) value);
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+            else
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                             String.format("Cannot construct 
'error-condition' from type '%s'",
+                                                           
value.getClass().getSimpleName()));
+            }
+        }
+        return condition;
+    }
+
+    private static final class UnknownErrorCondition implements ErrorCondition
+    {
+        private final Symbol _value;
+
+        public UnknownErrorCondition(final Symbol value)
+        {
+            _value = value;
+        }
+
+        @Override
+        public Symbol getValue()
+        {
+            return _value;
+        }
+
+        @Override
+        public boolean equals(final Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            final UnknownErrorCondition that = (UnknownErrorCondition) o;
+
+            if (!_value.equals(that._value))
+            {
+                return false;
+            }
+
+            return true;
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return _value.hashCode();
+        }
+
+        @Override
+        public String toString()
+        {
+            return _value.toString();
+        }
+    }
+
+    private static class UnknownTxnCapability implements TxnCapability
+    {
+        private final Symbol _value;
+
+        public UnknownTxnCapability(final Symbol value)
+        {
+            _value = value;
+        }
+
+        @Override
+        public boolean equals(final Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            final UnknownTxnCapability that = (UnknownTxnCapability) o;
+
+            return _value.equals(that._value);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return _value.hashCode();
+        }
+
+        @Override
+        public String toString()
+        {
+            return _value.toString();
+        }
+    }
+
+    private static class UnknownDistributionMode implements DistributionMode
+    {
+        private final Symbol _value;
+
+        public UnknownDistributionMode(final Symbol value)
+        {
+            _value = value;
+        }
+
+        @Override
+        public boolean equals(final Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            final UnknownDistributionMode that = (UnknownDistributionMode) o;
+
+            return _value.equals(that._value);
+        }
+
+        @Override
+        public int hashCode()
+        {
+            return _value.hashCode();
+        }
+
+        @Override
+        public String toString()
+        {
+            return _value.toString();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 801810e..a78df1a 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -988,15 +988,7 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
     @Override
     public void block(final Queue<?> queue)
     {
-        getAMQPConnection().doOnIOThreadAsync(
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        doBlock(queue);
-                    }
-                });
+        getAMQPConnection().doOnIOThreadAsync(() -> doBlock(queue));
     }
 
     private void doBlock(final Queue<?> queue)
@@ -1025,15 +1017,7 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
     @Override
     public void unblock(final Queue<?> queue)
     {
-        getAMQPConnection().doOnIOThreadAsync(
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        doUnblock(queue);
-                    }
-                });
+        getAMQPConnection().doOnIOThreadAsync(() -> doUnblock(queue));
     }
 
     private void doUnblock(final Queue<?> queue)
@@ -1058,15 +1042,7 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
     @Override
     public void block()
     {
-        getAMQPConnection().doOnIOThreadAsync(
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        doBlock();
-                    }
-                });
+        getAMQPConnection().doOnIOThreadAsync(this::doBlock);
     }
 
     private void doBlock()
@@ -1088,15 +1064,7 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
     @Override
     public void unblock()
     {
-        getAMQPConnection().doOnIOThreadAsync(
-                new Runnable()
-                {
-                    @Override
-                    public void run()
-                    {
-                        doUnblock();
-                    }
-                });
+        getAMQPConnection().doOnIOThreadAsync(this::doUnblock);
     }
 
     private void doUnblock()
@@ -1388,38 +1356,32 @@ public class Session_1_0 extends 
AbstractAMQPSession<Session_1_0, ConsumerTarget
         @Override
         public void onSuccess(final T endpoint)
         {
-            doOnIOThreadAsync(new Runnable()
-            {
-                @Override
-                public void run()
+            doOnIOThreadAsync(() -> {
+                _associatedLinkEndpoints.add(endpoint);
+                endpoint.setLocalHandle(findNextAvailableOutputHandle());
+                if (endpoint instanceof ErrantLinkEndpoint)
                 {
-                    _associatedLinkEndpoints.add(endpoint);
-                    endpoint.setLocalHandle(findNextAvailableOutputHandle());
-                    if (endpoint instanceof ErrantLinkEndpoint)
+                    endpoint.sendAttach();
+                    ((ErrantLinkEndpoint) endpoint).closeWithError();
+                }
+                else
+                {
+                    if (endpoint instanceof StandardReceivingLinkEndpoint
+                        && (_blockingEntities.contains(Session_1_0.this)
+                            || 
_blockingEntities.contains(((StandardReceivingLinkEndpoint) 
endpoint).getReceivingDestination())))
+                    {
+                        endpoint.setStopped(true);
+                    }
+                    _inputHandleToEndpoint.put(_attach.getHandle(), endpoint);
+                    if (!_endpointToOutputHandle.containsKey(endpoint))
                     {
+                        _endpointToOutputHandle.put(endpoint, 
endpoint.getLocalHandle());
                         endpoint.sendAttach();
-                        ((ErrantLinkEndpoint) endpoint).closeWithError();
+                        endpoint.start();
                     }
                     else
                     {
-                        if (endpoint instanceof StandardReceivingLinkEndpoint
-                            && (_blockingEntities.contains(Session_1_0.this)
-                                || 
_blockingEntities.contains(((StandardReceivingLinkEndpoint) 
endpoint).getReceivingDestination())))
-                        {
-                            endpoint.setStopped(true);
-                        }
-                        _inputHandleToEndpoint.put(_attach.getHandle(), 
endpoint);
-                        if (!_endpointToOutputHandle.containsKey(endpoint))
-                        {
-                            _endpointToOutputHandle.put(endpoint, 
endpoint.getLocalHandle());
-                            endpoint.sendAttach();
-                            endpoint.start();
-                        }
-                        else
-                        {
-                            // TODO - link stealing???
-                        }
-
+                        // TODO - link stealing???
                     }
                 }
             });

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java
new file mode 100644
index 0000000..c417b69
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+
+import java.lang.reflect.Array;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
+
+public abstract class AbstractCompositeTypeConstructor<T> implements 
DescribedTypeConstructor<T>
+{
+    @Override
+    public TypeConstructor<T> construct(final Object descriptor,
+                                        final List<QpidByteBuffer> in,
+                                        final int[] originalPositions,
+                                        final ValueHandler valueHandler) 
throws AmqpErrorException
+    {
+        return new FieldValueReader();
+    }
+
+    protected abstract String getTypeName();
+
+    protected abstract T construct(FieldValueReader x) throws 
AmqpErrorException;
+
+    protected class FieldValueReader implements TypeConstructor<T>
+    {
+        private List<QpidByteBuffer> _in;
+        private ValueHandler _valueHandler;
+        private int _count;
+
+        @Override
+        public T construct(final List<QpidByteBuffer> in, final ValueHandler 
handler) throws AmqpErrorException
+        {
+            _in = in;
+            _valueHandler = handler;
+            return constructType();
+        }
+
+        private T constructType() throws AmqpErrorException
+        {
+            final TypeConstructor typeConstructor = 
_valueHandler.readConstructor(_in);
+            if (typeConstructor instanceof ListConstructor)
+            {
+                ListConstructor listConstructor = (ListConstructor) 
typeConstructor;
+                int size;
+                long remaining = QpidByteBufferUtils.remaining(_in);
+                if (remaining < listConstructor.getSize())
+                {
+                    throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                                 String.format("Not sufficient 
data for deserialization of '%s'."
+                                                               + " Expected at 
least %d bytes. Got %d bytes.",
+                                                               getTypeName(),
+                                                               
listConstructor.getSize(),
+                                                               remaining));
+                }
+
+                if (listConstructor.getSize() == 1)
+                {
+                    size = QpidByteBufferUtils.get(_in) & 0xFF;
+                    _count = QpidByteBufferUtils.get(_in) & 0xFF;
+                }
+                else
+                {
+                    size = QpidByteBufferUtils.getInt(_in);
+                    _count = QpidByteBufferUtils.getInt(_in);
+                }
+
+                remaining -= listConstructor.getSize();
+                if (remaining < size)
+                {
+                    throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                                 String.format("Not sufficient 
data for deserialization of '%s'."
+                                                               + " Expected at 
least %d bytes. Got %d bytes.",
+                                                               getTypeName(),
+                                                               size,
+                                                               remaining));
+                }
+            }
+            else if (typeConstructor instanceof ZeroListConstructor)
+            {
+                _count = 0;
+            }
+            else
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                             String.format("Unexpected format 
when deserializing of '%s'",
+                                                           getTypeName()));
+            }
+            return AbstractCompositeTypeConstructor.this.construct(this);
+        }
+
+
+        public <F> F readValue(final int fieldIndex,
+                               final String fieldName,
+                               final boolean mandatory,
+                               final Class<F> expectedType) throws 
AmqpErrorException
+        {
+            if (fieldIndex >= _count)
+            {
+                if (mandatory)
+                {
+                    throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                                 String.format("Mandatory 
field '%s' of '%s' was not provided",
+                                                               fieldName,
+                                                               getTypeName()));
+                }
+                return null;
+            }
+
+            Object value = _valueHandler.parse(_in);
+
+
+            if (value == null && mandatory)
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                             String.format("Mandatory field 
'%s' of '%s' was not provided",
+                                                           fieldName,
+                                                           getTypeName()));
+            }
+
+            if (value != null && 
!expectedType.isAssignableFrom(value.getClass()))
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                             String.format(
+                                                     "Wrong type for field 
'%s' of '%s'. Expected '%s' but got '%s'.",
+                                                     fieldName,
+                                                     getTypeName(),
+                                                     
expectedType.getSimpleName(),
+                                                     
value.getClass().getSimpleName()));
+            }
+
+            return (F) value;
+        }
+
+        public <K, V> Map<K, V> readMapValue(final int fieldIndex,
+                                             final String fieldName,
+                                             final boolean mandatory,
+                                             final Class<K> expectedKeyType,
+                                             final Class<V> expectedValueType)
+                throws AmqpErrorException
+        {
+            if (fieldIndex >= _count)
+            {
+                if (mandatory)
+                {
+                    throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                                 String.format("Mandatory 
field '%s' of '%s' was not provided",
+                                                               fieldName,
+                                                               getTypeName()));
+                }
+                return null;
+            }
+
+            TypeConstructor typeConstructor = 
_valueHandler.readConstructor(_in);
+            if (typeConstructor instanceof MapConstructor)
+            {
+                MapConstructor mapConstructor = ((MapConstructor) 
typeConstructor);
+
+                return mapConstructor.construct(_in,
+                                                _valueHandler,
+                                                expectedKeyType,
+                                                expectedValueType);
+            }
+            else if (typeConstructor instanceof NullTypeConstructor)
+            {
+                if (mandatory)
+                {
+                    throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                                 "Mandatory field '%s' of '%s' 
was not provided",
+                                                 fieldName,
+                                                 getTypeName());
+                }
+            }
+            else
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                             String.format("Could not decode 
value field '%s' of '%s'",
+                                                           fieldName,
+                                                           getTypeName()));
+            }
+
+            return null;
+        }
+
+        public <F> F[] readArrayValue(final int fieldIndex,
+                                      final String fieldName,
+                                      final boolean mandatory,
+                                      final Class<F> expectedType,
+                                      final Converter<F> converter) throws 
AmqpErrorException
+        {
+            if (fieldIndex >= _count)
+            {
+                if (mandatory)
+                {
+                    throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                                 String.format("Mandatory 
field '%s' of '%s' was not provided",
+                                                               fieldName,
+                                                               getTypeName()));
+                }
+                return null;
+            }
+
+            Object value = _valueHandler.parse(_in);
+
+            if (mandatory && value == null)
+            {
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                             String.format("Mandatory field 
'%s' of '%s' was not provided",
+                                                           fieldName,
+                                                           getTypeName()));
+            }
+
+            if (value != null)
+            {
+                if (value.getClass().isArray())
+                {
+                    if 
(expectedType.isAssignableFrom(value.getClass().getComponentType()))
+                    {
+                        return (F[]) value;
+                    }
+                    else
+                    {
+                        final Object[] objects = (Object[]) value;
+                        F[] array = (F[]) Array.newInstance(expectedType, 
objects.length);
+                        try
+                        {
+                            for (int i = 0; i < objects.length; ++i)
+                            {
+                                array[i] = converter.convert(objects[i]);
+                            }
+                        }
+                        catch (RuntimeException e)
+                        {
+                            Error error = new Error(AmqpError.DECODE_ERROR,
+                                                    String.format("Could not 
decode value field '%s' of '%s'", fieldName, getTypeName()));
+                            throw new AmqpErrorException(error, e);
+                        }
+                        return array;
+                    }
+                }
+                else if (expectedType.isAssignableFrom(value.getClass()))
+                {
+                    F[] array = (F[]) Array.newInstance(expectedType, 1);
+                    array[0] = (F) value;
+                    return array;
+                }
+                else
+                {
+                    try
+                    {
+                        final F convertedValue = converter.convert(value);
+                        F[] array = (F[]) Array.newInstance(expectedType, 1);
+                        array[0] = convertedValue;
+                        return array;
+                    }
+                    catch (RuntimeException e)
+                    {
+                        Error error = new Error(AmqpError.DECODE_ERROR,
+                                                String.format("Could not 
decode value field '%s' of '%s'", fieldName, getTypeName()));
+                        throw new AmqpErrorException(error, e);
+                    }
+                }
+            }
+
+            return null;
+        }
+    }
+
+    public interface Converter<T>
+    {
+        T convert(Object o) throws AmqpErrorException;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java
index e8a14ff..c8da269 100644
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java
@@ -20,43 +20,29 @@
  */
 package org.apache.qpid.server.protocol.v1_0.codec;
 
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
 
 public abstract class AbstractDescribedTypeConstructor<T extends Object> 
implements DescribedTypeConstructor<T>
 {
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(AbstractDescribedTypeConstructor.class);
-
     @Override
     public TypeConstructor<T> construct(final Object descriptor,
                                         final List<QpidByteBuffer> in,
                                         final int[] originalPositions, final 
ValueHandler valueHandler)
             throws AmqpErrorException
     {
-
         return new TypeConstructorFromUnderlying<>(this, 
valueHandler.readConstructor(in));
     }
 
-    protected abstract T construct(Object underlying);
+    protected abstract T construct(Object underlying) throws 
AmqpErrorException;
 
     private static class TypeConstructorFromUnderlying<S extends Object> 
implements TypeConstructor<S>
     {
 
         private final TypeConstructor _describedConstructor;
         private AbstractDescribedTypeConstructor<S> _describedTypeConstructor;
-        private static final Map<Class<?>, CompositeTypeValidator> _validators 
= new ConcurrentHashMap<>();
 
         public TypeConstructorFromUnderlying(final 
AbstractDescribedTypeConstructor<S> describedTypeConstructor,
                                              final TypeConstructor 
describedConstructor)
@@ -68,57 +54,7 @@ public abstract class AbstractDescribedTypeConstructor<T 
extends Object> impleme
         @Override
         public S construct(final List<QpidByteBuffer> in, final ValueHandler 
handler) throws AmqpErrorException
         {
-            final S constructedObject =
-                    
_describedTypeConstructor.construct(_describedConstructor.construct(in, 
handler));
-            CompositeTypeValidator<S> validator =
-                    _validators.computeIfAbsent(constructedObject.getClass(), 
k -> createValidator(constructedObject));
-            validator.validate(constructedObject);
-            return constructedObject;
-        }
-
-        private CompositeTypeValidator<S> createValidator(final S 
constructedObject)
-        {
-            final List<Field> mandatoryFields = new ArrayList<>();
-            for (Field field : 
constructedObject.getClass().getDeclaredFields())
-            {
-                Annotation[] annotations = 
field.getDeclaredAnnotationsByType(CompositeTypeField.class);
-                for (Annotation annotation : annotations)
-                {
-                    if (annotation instanceof CompositeTypeField && 
((CompositeTypeField) annotation).mandatory())
-                    {
-                        field.setAccessible(true);
-                        mandatoryFields.add(field);
-                    }
-                }
-            }
-            return objectToValidate ->
-            {
-                try
-                {
-                    if (!mandatoryFields.isEmpty())
-                    {
-                        for (Field field : mandatoryFields)
-                        {
-                            if (field.get(objectToValidate) == null)
-                            {
-                                throw new 
AmqpErrorException(AmqpError.DECODE_ERROR,
-                                                             
String.format("Missing mandatory field '%s'.",
-                                                                           
field.getName()));
-                            }
-                        }
-                    }
-                }
-                catch (IllegalAccessException e)
-                {
-                    LOGGER.error(String.format("Error validating AMQP 1.0 
object '%s'", constructedObject.toString()), e);
-                    throw new AmqpErrorException(AmqpError.INTERNAL_ERROR, 
"Failure during object validation");
-                }
-            };
-        }
-
-        private interface CompositeTypeValidator<S>
-        {
-            void validate(final S constructedObject) throws AmqpErrorException;
+            return 
_describedTypeConstructor.construct(_describedConstructor.construct(in, 
handler));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
deleted file mode 100644
index 8d54691..0000000
--- 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.protocol.v1_0.codec;
-
-import java.util.ArrayList;
-import java.util.Formatter;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
-import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
-import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
-import org.apache.qpid.server.protocol.v1_0.type.transport.Error;
-
-public class CompoundTypeConstructor<T> extends VariableWidthTypeConstructor<T>
-{
-    private final CompoundTypeAssembler.Factory<T> _assemblerFactory;
-
-    public static final CompoundTypeAssembler.Factory LIST_ASSEMBLER_FACTORY =
-            new CompoundTypeAssembler.Factory()
-            {
-
-                @Override
-                public CompoundTypeAssembler newInstance()
-                {
-                    return new ListAssembler();
-                }
-            };
-
-
-
-    private static class ListAssembler implements CompoundTypeAssembler<List>
-    {
-        private List _list;
-
-        @Override
-        public void init(final int count) throws AmqpErrorException
-        {
-            _list = new ArrayList(count);
-        }
-
-        @Override
-        public void addItem(final Object obj) throws AmqpErrorException
-        {
-            _list.add(obj);
-        }
-
-        @Override
-        public List complete() throws AmqpErrorException
-        {
-            return _list;
-        }
-
-        @Override
-        public String toString()
-        {
-            return "ListAssembler{" +
-                   "_list=" + _list +
-                   '}';
-        }
-    }
-
-
-    public static final CompoundTypeAssembler.Factory MAP_ASSEMBLER_FACTORY =
-            new CompoundTypeAssembler.Factory<Map>()
-            {
-
-                @Override
-                public CompoundTypeAssembler<Map> newInstance()
-                {
-                    return new MapAssembler();
-                }
-            };
-
-    private static class MapAssembler implements CompoundTypeAssembler<Map>
-    {
-        private Map _map;
-        private Object _lastKey;
-        private static final Object NOT_A_KEY = new Object();
-
-
-        @Override
-        public void init(final int count) throws AmqpErrorException
-        {
-            // Can't have an odd number of elements in a map
-            if((count & 0x1) == 1)
-            {
-                Error error = new Error();
-                error.setCondition(AmqpError.DECODE_ERROR);
-                Formatter formatter = new Formatter();
-                formatter.format("map cannot have odd number of elements: %d", 
count);
-                error.setDescription(formatter.toString());
-                throw new AmqpErrorException(error);
-            }
-            _map = new HashMap(count);
-            _lastKey = NOT_A_KEY;
-        }
-
-        @Override
-        public void addItem(final Object obj) throws AmqpErrorException
-        {
-            if(_lastKey != NOT_A_KEY)
-            {
-                if(_map.put(_lastKey, obj) != null)
-                {
-                    Error error = new Error();
-                    error.setCondition(AmqpError.DECODE_ERROR);
-                    Formatter formatter = new Formatter();
-                    formatter.format("map cannot have duplicate keys: %s has 
values (%s, %s)", _lastKey, _map.get(_lastKey), obj);
-                    error.setDescription(formatter.toString());
-
-                    throw new AmqpErrorException(error);
-                }
-                _lastKey = NOT_A_KEY;
-            }
-            else
-            {
-                _lastKey = obj;
-            }
-
-        }
-
-        @Override
-        public Map complete() throws AmqpErrorException
-        {
-            return _map;
-        }
-    }
-
-
-    public static <X> CompoundTypeConstructor<X> getInstance(int i,
-                                                      
CompoundTypeAssembler.Factory<X> assemblerFactory)
-    {
-        return new CompoundTypeConstructor<>(i, assemblerFactory);
-    }
-
-
-    private CompoundTypeConstructor(int size,
-                                    final CompoundTypeAssembler.Factory<T> 
assemblerFactory)
-    {
-        super(size);
-        _assemblerFactory = assemblerFactory;
-    }
-
-    @Override
-    public T construct(final List<QpidByteBuffer> in, final ValueHandler 
handler) throws AmqpErrorException
-    {
-        int size;
-        int count;
-
-        if(getSize() == 1)
-        {
-            size = QpidByteBufferUtils.get(in) & 0xFF;
-            count = QpidByteBufferUtils.get(in) & 0xFF;
-        }
-        else
-        {
-            size = QpidByteBufferUtils.getInt(in);
-            count = QpidByteBufferUtils.getInt(in);
-        }
-
-        CompoundTypeAssembler<T> assembler = _assemblerFactory.newInstance();
-
-        assembler.init(count);
-
-        for(int i = 0; i < count; i++)
-        {
-            assembler.addItem(handler.parse(in));
-        }
-
-        return assembler.complete();
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ListConstructor.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ListConstructor.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ListConstructor.java
new file mode 100644
index 0000000..87011d2
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ListConstructor.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+
+public class ListConstructor extends VariableWidthTypeConstructor<List>
+{
+    private ListConstructor(final int size)
+    {
+        super(size);
+    }
+
+    @Override
+    public List construct(final List<QpidByteBuffer> in, final ValueHandler 
handler) throws AmqpErrorException
+    {
+        int size;
+        int count;
+        long remaining = QpidByteBufferUtils.remaining(in);
+        if (remaining < getSize())
+        {
+            throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                         String.format("Not sufficient data 
for deserialization of 'list'."
+                                                       + " Expected at least 
%d bytes. Got %d bytes.",
+                                                       getSize(),
+                                                       remaining));
+        }
+
+        if(getSize() == 1)
+        {
+            size = QpidByteBufferUtils.get(in) & 0xFF;
+            count = QpidByteBufferUtils.get(in) & 0xFF;
+        }
+        else
+        {
+            size = QpidByteBufferUtils.getInt(in);
+            count = QpidByteBufferUtils.getInt(in);
+        }
+        remaining -= getSize();
+        if (remaining < size)
+        {
+            throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                         String.format("Not sufficient data 
for deserialization of 'list'."
+                                                       + " Expected at least 
%d bytes. Got %d bytes.",
+                                                       size,
+                                                       remaining));
+        }
+        return construct(in, handler, size, count);
+    }
+
+    protected List construct(final List<QpidByteBuffer> in,
+                             final ValueHandler handler,
+                             final int size,
+                             final int count)
+            throws AmqpErrorException
+    {
+        List list = new ArrayList(count);
+
+        for(int i = 0; i < count; i++)
+        {
+            list.add(handler.parse(in));
+        }
+
+        return list;
+    }
+
+    public static TypeConstructor getInstance(final int size)
+    {
+        return new ListConstructor(size);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
----------------------------------------------------------------------
diff --git 
a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
new file mode 100644
index 0000000..4c791e8
--- /dev/null
+++ 
b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0.codec;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils;
+import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;
+
+public class MapConstructor extends VariableWidthTypeConstructor<Map>
+{
+    private MapConstructor(final int size)
+    {
+        super(size);
+    }
+
+    @Override
+    public Map construct(final List<QpidByteBuffer> in, final ValueHandler 
handler) throws AmqpErrorException
+    {
+        return construct(in, handler, Object.class, Object.class);
+    }
+
+    public <T,S> Map<T, S> construct(final List<QpidByteBuffer> in, final 
ValueHandler handler,
+                         Class<T> keyType,
+                         Class<S> valueType) throws AmqpErrorException
+    {
+        int size;
+        int count;
+
+        long remaining = QpidByteBufferUtils.remaining(in);
+        if (remaining < getSize())
+        {
+            throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                         String.format("Not sufficient data 
for deserialization of 'map'."
+                                                       + " Expected at least 
%d bytes. Got %d bytes.",
+                                                       getSize(),
+                                                       remaining));
+        }
+
+        if(getSize() == 1)
+        {
+            size = QpidByteBufferUtils.get(in) & 0xFF;
+            count = QpidByteBufferUtils.get(in) & 0xFF;
+        }
+        else
+        {
+            size = QpidByteBufferUtils.getInt(in);
+            count = QpidByteBufferUtils.getInt(in);
+        }
+        remaining -= getSize();
+        if (remaining < size)
+        {
+            throw new AmqpErrorException(AmqpError.DECODE_ERROR,
+                                         String.format("Not sufficient data 
for deserialization of 'map'."
+                                                       + " Expected at least 
%d bytes. Got %d bytes.",
+                                                       size,
+                                                       remaining));
+        }
+
+        return construct(in, handler, size, count, keyType, valueType);
+    }
+
+
+    private <T, S> Map<T,S> construct(final List<QpidByteBuffer> in,
+                            final ValueHandler handler,
+                            final int size,
+                            final int count,
+                            Class<T> keyType,
+                            Class<S> valueType)
+            throws AmqpErrorException
+    {
+
+        // Can't have an odd number of elements in a map
+        if ((count & 0x1) == 1)
+        {
+            String message = String.format("Map cannot have odd number of 
elements: %d", count);
+            throw new AmqpErrorException(AmqpError.DECODE_ERROR, message);
+        }
+
+        Map<T, S> map = new LinkedHashMap<>(count);
+
+        final int mapSize = count / 2;
+        for(int i = 0; i < mapSize; i++)
+        {
+            Object key = handler.parse(in);
+            if (key != null && !keyType.isAssignableFrom(key.getClass()))
+            {
+                String message = String.format("Expected key type is '%s' but 
got '%s'",
+                                               keyType.getSimpleName(),
+                                               key.getClass().getSimpleName());
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR, message);
+            }
+
+            Object value = handler.parse(in);
+            if (value != null && !valueType.isAssignableFrom(value.getClass()))
+            {
+                String message = String.format("Expected key type is '%s' but 
got '%s'",
+                                               valueType.getSimpleName(),
+                                               
value.getClass().getSimpleName());
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR, message);
+            }
+
+            Object oldValue;
+            if ((oldValue = map.put((T)key, (S)value)) != null)
+            {
+                String message = String.format("Map cannot have duplicate 
keys: %s has values (%s, %s)",
+                                               key,
+                                               oldValue,
+                                               value);
+                throw new AmqpErrorException(AmqpError.DECODE_ERROR, message);
+            }
+
+        }
+        return map;
+    }
+
+
+    public static MapConstructor getInstance(int size)
+    {
+        return new MapConstructor(size);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to