QPID-7932: [Java Broker, AMQP 1.0] Improve error handling when deserializing composite types
The constructors are now auto-generated with generation being driven from annotations. Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/6a267175 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/6a267175 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/6a267175 Branch: refs/heads/master Commit: 6a2671755d49f849e69d1392eeaef2c1a9783713 Parents: 85459e5 Author: Lorenz Quack <lqu...@apache.org> Authored: Mon Sep 25 10:36:14 2017 +0100 Committer: Lorenz Quack <lqu...@apache.org> Committed: Thu Sep 28 14:30:17 2017 +0100 ---------------------------------------------------------------------- .../server/protocol/v1_0/CompositeType.java | 34 ++ .../v1_0/CompositeTypeConstructorGenerator.java | 443 ++++++++++++++++++ .../protocol/v1_0/CompositeTypeField.java | 35 ++ .../javax.annotation.processing.Processor | 1 + .../protocol/v1_0/DeserializationFactories.java | 345 ++++++++++++++ .../qpid/server/protocol/v1_0/Session_1_0.java | 86 +--- .../codec/AbstractCompositeTypeConstructor.java | 297 ++++++++++++ .../codec/AbstractDescribedTypeConstructor.java | 68 +-- .../v1_0/codec/CompoundTypeConstructor.java | 193 -------- .../protocol/v1_0/codec/ListConstructor.java | 95 ++++ .../protocol/v1_0/codec/MapConstructor.java | 144 ++++++ .../protocol/v1_0/codec/ValueHandler.java | 6 +- .../protocol/v1_0/framing/FrameHandler.java | 5 + .../protocol/v1_0/type/CompositeTypeField.java | 33 -- .../type/codec/AMQPDescribedTypeRegistry.java | 2 +- .../v1_0/type/messaging/AbstractSection.java | 6 +- .../protocol/v1_0/type/messaging/Accepted.java | 2 + .../type/messaging/AmqpSequenceSection.java | 4 +- .../v1_0/type/messaging/AmqpValueSection.java | 4 +- .../messaging/ApplicationPropertiesSection.java | 4 +- .../v1_0/type/messaging/DataSection.java | 4 +- .../v1_0/type/messaging/DeleteOnClose.java | 2 + .../v1_0/type/messaging/DeleteOnNoLinks.java | 2 + .../messaging/DeleteOnNoLinksOrMessages.java | 2 + .../v1_0/type/messaging/DeleteOnNoMessages.java | 2 + .../messaging/DeliveryAnnotationsSection.java | 4 +- .../v1_0/type/messaging/FooterSection.java | 4 +- .../protocol/v1_0/type/messaging/Header.java | 14 +- .../v1_0/type/messaging/HeaderSection.java | 4 +- .../messaging/MessageAnnotationsSection.java | 4 +- .../protocol/v1_0/type/messaging/Modified.java | 16 +- .../v1_0/type/messaging/Properties.java | 30 +- .../v1_0/type/messaging/PropertiesSection.java | 4 +- .../protocol/v1_0/type/messaging/Received.java | 8 +- .../protocol/v1_0/type/messaging/Rejected.java | 6 +- .../protocol/v1_0/type/messaging/Released.java | 2 + .../protocol/v1_0/type/messaging/Source.java | 45 +- .../v1_0/type/messaging/StdDistMode.java | 42 +- .../protocol/v1_0/type/messaging/Target.java | 66 ++- .../v1_0/type/messaging/TerminusDurability.java | 48 +- .../type/messaging/TerminusExpiryPolicy.java | 64 ++- .../messaging/codec/AcceptedConstructor.java | 72 --- .../codec/DeleteOnCloseConstructor.java | 72 --- .../codec/DeleteOnNoLinksConstructor.java | 72 --- .../DeleteOnNoLinksOrMessagesConstructor.java | 72 --- .../codec/DeleteOnNoMessagesConstructor.java | 72 --- .../type/messaging/codec/HeaderConstructor.java | 209 --------- .../messaging/codec/ModifiedConstructor.java | 153 ------ .../messaging/codec/PropertiesConstructor.java | 426 ----------------- .../messaging/codec/ReceivedConstructor.java | 126 ----- .../messaging/codec/RejectedConstructor.java | 98 ---- .../messaging/codec/ReleasedConstructor.java | 71 --- .../type/messaging/codec/SourceConstructor.java | 385 --------------- .../type/messaging/codec/TargetConstructor.java | 270 ----------- .../v1_0/type/security/SaslChallenge.java | 27 +- .../protocol/v1_0/type/security/SaslCode.java | 70 ++- .../protocol/v1_0/type/security/SaslInit.java | 37 +- .../v1_0/type/security/SaslMechanisms.java | 22 +- .../v1_0/type/security/SaslOutcome.java | 33 +- .../v1_0/type/security/SaslResponse.java | 27 +- .../codec/SaslChallengeConstructor.java | 99 ---- .../security/codec/SaslInitConstructor.java | 153 ------ .../codec/SaslMechanismsConstructor.java | 106 ----- .../security/codec/SaslOutcomeConstructor.java | 126 ----- .../security/codec/SaslResponseConstructor.java | 99 ---- .../v1_0/type/transaction/Coordinator.java | 24 +- .../protocol/v1_0/type/transaction/Declare.java | 19 +- .../v1_0/type/transaction/Declared.java | 6 +- .../v1_0/type/transaction/Discharge.java | 26 +- .../type/transaction/TransactionErrors.java | 62 +-- .../type/transaction/TransactionalState.java | 8 +- .../v1_0/type/transaction/TxnCapabilities.java | 146 ------ .../v1_0/type/transaction/TxnCapability.java | 75 ++- .../codec/CoordinatorConstructor.java | 117 ----- .../transaction/codec/DeclareConstructor.java | 100 ---- .../transaction/codec/DeclaredConstructor.java | 99 ---- .../transaction/codec/DischargeConstructor.java | 126 ----- .../codec/TransactionalStateConstructor.java | 126 ----- .../protocol/v1_0/type/transport/AmqpError.java | 1 - .../protocol/v1_0/type/transport/Attach.java | 32 +- .../protocol/v1_0/type/transport/Begin.java | 20 +- .../protocol/v1_0/type/transport/Close.java | 6 +- .../v1_0/type/transport/ConnectionError.java | 1 - .../protocol/v1_0/type/transport/Detach.java | 10 +- .../v1_0/type/transport/Disposition.java | 16 +- .../protocol/v1_0/type/transport/End.java | 6 +- .../protocol/v1_0/type/transport/Error.java | 18 +- .../protocol/v1_0/type/transport/Flow.java | 26 +- .../protocol/v1_0/type/transport/LinkError.java | 1 - .../protocol/v1_0/type/transport/Open.java | 24 +- .../v1_0/type/transport/SessionError.java | 1 - .../protocol/v1_0/type/transport/Transfer.java | 26 +- .../type/transport/codec/AttachConstructor.java | 465 ------------------- .../type/transport/codec/BeginConstructor.java | 303 ------------ .../type/transport/codec/CloseConstructor.java | 99 ---- .../type/transport/codec/DetachConstructor.java | 153 ------ .../transport/codec/DispositionConstructor.java | 234 ---------- .../type/transport/codec/EndConstructor.java | 99 ---- .../type/transport/codec/ErrorConstructor.java | 224 --------- .../type/transport/codec/FlowConstructor.java | 370 --------------- .../type/transport/codec/OpenConstructor.java | 373 --------------- .../transport/codec/TransferConstructor.java | 369 --------------- .../protocol/v1_0/store/LinkStoreTestCase.java | 2 +- .../tests/protocol/v1_0/DecodeErrorTest.java | 196 ++++++++ .../protocol/v1_0/messaging/MessageFormat.java | 49 -- 105 files changed, 2115 insertions(+), 7020 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeType.java ---------------------------------------------------------------------- diff --git a/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeType.java b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeType.java new file mode 100644 index 0000000..41281c7 --- /dev/null +++ b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeType.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol.v1_0; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.TYPE) +public @interface CompositeType +{ + String symbolicDescriptor(); + long numericDescriptor(); +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java ---------------------------------------------------------------------- diff --git a/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java new file mode 100644 index 0000000..47308c8 --- /dev/null +++ b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeConstructorGenerator.java @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol.v1_0; + +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Consumer; + +import javax.annotation.processing.AbstractProcessor; +import javax.annotation.processing.Filer; +import javax.annotation.processing.ProcessingEnvironment; +import javax.annotation.processing.RoundEnvironment; +import javax.lang.model.SourceVersion; +import javax.lang.model.element.AnnotationMirror; +import javax.lang.model.element.AnnotationValue; +import javax.lang.model.element.Element; +import javax.lang.model.element.ElementKind; +import javax.lang.model.element.ExecutableElement; +import javax.lang.model.element.PackageElement; +import javax.lang.model.element.TypeElement; +import javax.lang.model.element.VariableElement; +import javax.lang.model.type.ArrayType; +import javax.lang.model.type.DeclaredType; +import javax.lang.model.type.TypeKind; +import javax.lang.model.type.TypeMirror; +import javax.lang.model.util.Elements; +import javax.lang.model.util.Types; +import javax.tools.Diagnostic; +import javax.tools.JavaFileObject; + +import org.apache.qpid.server.License; + + +public class CompositeTypeConstructorGenerator extends AbstractProcessor +{ + private static final List<String> RESTRICTED_TYPES = Arrays.asList( + "org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError", + "org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError", + "org.apache.qpid.server.protocol.v1_0.type.transport.SessionError", + "org.apache.qpid.server.protocol.v1_0.type.transport.LinkError", + "org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionErrors", + "org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode", + "org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode", + "org.apache.qpid.server.protocol.v1_0.type.transport.Role", + "org.apache.qpid.server.protocol.v1_0.type.security.SaslCode", + "org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionDetectionPolicy", + "org.apache.qpid.server.protocol.v1_0.type.extensions.soleconn.SoleConnectionEnforcementPolicy", + "org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode", + "org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability", + "org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy", + "org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability"); + + + @Override + public SourceVersion getSupportedSourceVersion() + { + return SourceVersion.latest(); + } + + @Override + public Set<String> getSupportedAnnotationTypes() + { + return Collections.singleton(CompositeType.class.getName()); + } + + @Override + public boolean process(final Set<? extends TypeElement> annotations, final RoundEnvironment roundEnvironment) + { + if(roundEnvironment.processingOver()) + { + return true; + } + + Filer filer = processingEnv.getFiler(); + try + { + for (Element e : roundEnvironment.getElementsAnnotatedWith(CompositeType.class)) + { + if(e.getKind() == ElementKind.CLASS) + { + generateCompositeTypeConstructor(filer, (TypeElement) e); + } + } + } + catch (Exception e) + { + try(StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw)) + { + e.printStackTrace(pw); + processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, "Unexpected Error: " + sw.toString()); + } + catch (IOException ioe) + { + processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, "Error: " + ioe.getLocalizedMessage()); + } + } + return true; + } + + + private void generateCompositeTypeConstructor(final Filer filer, final TypeElement typeElement) + { + String objectQualifiedClassName = typeElement.getQualifiedName().toString(); + String objectSimpleName = typeElement.getSimpleName().toString(); + String compositeTypeConstructorNameSimpleName = objectSimpleName + "Constructor"; + PackageElement packageElement = (PackageElement) typeElement.getEnclosingElement(); + final String compositeTypeConstructorPackage = packageElement.getQualifiedName() + ".codec"; + String compositeTypeConstructorName = compositeTypeConstructorPackage + "." + compositeTypeConstructorNameSimpleName; + final CompositeType annotation = typeElement.getAnnotation(CompositeType.class); + + processingEnv.getMessager().printMessage(Diagnostic.Kind.NOTE, "Generating composite constructor file for " + objectQualifiedClassName); + + try + { + JavaFileObject factoryFile = filer.createSourceFile(compositeTypeConstructorName); + PrintWriter pw = new PrintWriter(new OutputStreamWriter(factoryFile.openOutputStream(), "UTF-8")); + pw.println("/*"); + for(String headerLine : License.LICENSE) + { + pw.println(" *" + headerLine); + } + pw.println(" */"); + pw.println(); + pw.print("package "); + pw.print(compositeTypeConstructorPackage); + pw.println(";"); + pw.println(); + + pw.println("import java.util.List;"); + pw.println(); + pw.println("import org.apache.qpid.server.protocol.v1_0.codec.AbstractCompositeTypeConstructor;"); + pw.println("import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry;"); + pw.println("import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;"); + pw.println("import org.apache.qpid.server.protocol.v1_0.type.Symbol;"); + pw.println("import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong;"); + pw.println("import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError;"); + pw.println("import org.apache.qpid.server.protocol.v1_0.type.transport.Error;"); + pw.println("import " + objectQualifiedClassName + ";"); + pw.println(); + + pw.println("public final class " + compositeTypeConstructorNameSimpleName + " extends AbstractCompositeTypeConstructor<"+ objectSimpleName +">"); + pw.println("{"); + pw.println(" private static final " + compositeTypeConstructorNameSimpleName + " INSTANCE = new " + compositeTypeConstructorNameSimpleName + "();"); + pw.println(); + + pw.println(" public static void register(DescribedTypeConstructorRegistry registry)"); + pw.println(" {"); + pw.println(" registry.register(Symbol.valueOf(\"" + annotation.symbolicDescriptor() + "\"), INSTANCE);"); + pw.println(String.format(" registry.register(UnsignedLong.valueOf(%#016x), INSTANCE);", annotation.numericDescriptor())); + pw.println(" }"); + pw.println(); + + pw.println(" @Override"); + pw.println(" protected String getTypeName()"); + pw.println(" {"); + pw.println(" return " + objectSimpleName + ".class.getSimpleName();"); + pw.println(" }"); + pw.println(); + + pw.println(" @Override"); + pw.println(" protected " + objectSimpleName + " construct(final FieldValueReader fieldValueReader) throws AmqpErrorException"); + pw.println(" {"); + pw.println(" " + objectSimpleName + " obj = new " + objectSimpleName + "();"); + pw.println(); + generateAssigners(pw, typeElement); + pw.println(" return obj;"); + pw.println(" }"); + pw.println("}"); + pw.close(); + } + catch (IOException e) + { + processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, + "Failed to write composite constructor file: " + + compositeTypeConstructorName + + " - " + + e.getLocalizedMessage()); + } + } + + private void generateAssigners(final PrintWriter pw, final TypeElement typeElement) + { + Types typeUtils = processingEnv.getTypeUtils(); + + final List<AnnotatedField> annotatedFields = new ArrayList<>(); + for (Element element : typeElement.getEnclosedElements()) + { + if (element instanceof VariableElement && element.getKind() == ElementKind.FIELD) + { + boolean annotationFound = false; + for(AnnotationMirror annotationMirror : element.getAnnotationMirrors()) + { + if(annotationMirror.getAnnotationType().toString().equals("org.apache.qpid.server.protocol.v1_0.CompositeTypeField")) + { + if (annotationFound) + { + processingEnv.getMessager() + .printMessage(Diagnostic.Kind.ERROR, + String.format( + "More than one CompositeTypeField annotations on field '%s.%s'", + typeElement.getSimpleName(), + element.getSimpleName())); + } + annotationFound = true; + annotatedFields.add(new AnnotatedField((VariableElement) element, annotationMirror)); + } + } + } + } + + annotatedFields.sort(Comparator.comparingInt(AnnotatedField::getIndex)); + + for (int index = 0; index < annotatedFields.size(); ++index) + { + AnnotatedField annotatedField = annotatedFields.get(index); + final VariableElement variableElement = annotatedField.getVariableElement(); + final String fieldName = stripUnderscore(variableElement.getSimpleName().toString()); + if (annotatedField.getIndex() != index) + { + processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR,String.format( + "Unexpected CompositeTypeField index '%d' is specified on field '%s' of '%s'. Expected %d.", + annotatedField.getIndex(), + fieldName, + typeElement.getSimpleName(), + index)); + } + + final String baseIndent = " "; + if (variableElement.asType().getKind() == TypeKind.ARRAY) + { + final TypeMirror componentType = ((ArrayType) variableElement.asType()).getComponentType(); + final String functionString; + if (annotatedField.getFactory() != null) + { + functionString = "x -> " + annotatedField.getFactory() + "(x)"; + } + else if (RESTRICTED_TYPES.contains(componentType)) + { + functionString = variableElement.asType().toString() + "::valueOf"; + } + else + { + functionString = "x -> (" + componentType + ") x"; + } + pw.println(String.format(" %s %s = fieldValueReader.readArrayValue(%d, \"%s\", %s, %s.class, %s);", + annotatedField.getVariableElement().asType(), + fieldName, + index, + fieldName, + annotatedField.isMandatory(), + componentType, + functionString)); + optionallyWrapInNullCheck(!annotatedField.isMandatory(), pw, baseIndent, fieldName, indent -> { + pw.println(indent + "obj." + getSetterName(variableElement) + "(" + fieldName + ");"); + }); + } + else if (annotatedField.getFactory() != null || RESTRICTED_TYPES.contains(variableElement.asType().toString())) + { + String functionName = annotatedField.getFactory() != null ? annotatedField.getFactory() : variableElement.asType().toString() + ".valueOf"; + pw.println(String.format(" Object %s = fieldValueReader.readValue(%d, \"%s\", %s, Object.class);", + fieldName, + index, + fieldName, + annotatedField.isMandatory())); + optionallyWrapInNullCheck(!annotatedField.isMandatory(), pw, baseIndent, fieldName, indent -> { + pw.println(indent + "try"); + pw.println(indent + "{"); + pw.println(indent + " obj." + getSetterName(variableElement) + "(" + functionName + "(" + fieldName + "));"); + pw.println(indent + "}"); + pw.println(indent + "catch (RuntimeException e)"); + pw.println(indent + "{"); + pw.println(indent + " Error error = new Error(AmqpError.DECODE_ERROR, \"Could not decode value field '" + fieldName + "' of '" + typeElement.getSimpleName() + "'\");"); + pw.println(indent + " throw new AmqpErrorException(error, e);"); + pw.println(indent + "}"); + }); + } + else if (typeUtils.isSameType(typeUtils.erasure(variableElement.asType()), + getErasure(processingEnv, "java.util.Map"))) + { + List<? extends TypeMirror> args = ((DeclaredType) variableElement.asType()).getTypeArguments(); + if (args.size() != 2) + { + processingEnv.getMessager().printMessage(Diagnostic.Kind.ERROR, + "Map types must have exactly two type arguments"); + } + pw.println(String.format(" %s %s = fieldValueReader.readMapValue(%d, \"%s\", %s, %s.class, %s.class);", + annotatedField.getVariableElement().asType(), + fieldName, + index, + fieldName, + annotatedField.isMandatory(), + args.get(0), + args.get(1))); + optionallyWrapInNullCheck(!annotatedField.isMandatory(), pw, baseIndent, fieldName, indent -> { + pw.println(indent + "obj." + getSetterName(variableElement) + "(" + fieldName + ");"); + }); + } + else + { + pw.println(String.format(" %s %s = fieldValueReader.readValue(%d, \"%s\", %s, %s.class);", + annotatedField.getVariableElement().asType(), + fieldName, + index, + fieldName, + annotatedField.isMandatory(), + annotatedField.getVariableElement().asType())); + optionallyWrapInNullCheck(!annotatedField.isMandatory(), pw, baseIndent, fieldName, indent -> { + pw.println(indent + "obj." + getSetterName(variableElement) + "(" + fieldName + ");"); + }); + } + + pw.println(); + } + } + + private void optionallyWrapInNullCheck(boolean wrap, PrintWriter pw, String indent, String fieldName, Consumer<String> f) + { + if (wrap) + { + pw.println(indent + "if (" + fieldName + " != null)"); + pw.println(indent + "{"); + indent += " "; + } + f.accept(indent); + if (wrap) + { + indent = indent.substring(4); + pw.println(indent + "}"); + } + } + + private String getSetterName(final VariableElement variableElement) + { + final String fieldName = stripUnderscore(variableElement.getSimpleName().toString()); + return "set" + fieldName.substring(0, 1).toUpperCase() + fieldName.substring(1); + } + + private String stripUnderscore(final String fieldName) + { + if (fieldName.startsWith("_")) + { + return fieldName.substring(1); + } + return fieldName; + } + + private static class AnnotatedField + { + private final VariableElement _variableElement; + private final AnnotationMirror _annotationMirror; + private final int _index; + private final String _factory; + private final boolean _mandatory; + + public AnnotatedField(final VariableElement variableElement, final AnnotationMirror annotationMirror) + { + _variableElement = variableElement; + _annotationMirror = annotationMirror; + String factory = null; + boolean mandatory = false; + int index = -1; + for (Map.Entry<? extends ExecutableElement,? extends AnnotationValue> entry : annotationMirror.getElementValues().entrySet()) + { + if ("index".contentEquals(entry.getKey().getSimpleName())) + { + index = (int) entry.getValue().getValue(); + } + else if ("deserializationConverter".contentEquals(entry.getKey().getSimpleName())) + { + factory = (String) entry.getValue().getValue(); + } + else if ("mandatory".contentEquals(entry.getKey().getSimpleName())) + { + mandatory = (boolean) entry.getValue().getValue(); + } + } + _index = index; + _mandatory = mandatory; + _factory = factory; + } + + public VariableElement getVariableElement() + { + return _variableElement; + } + + public AnnotationMirror getAnnotationMirror() + { + return _annotationMirror; + } + + public int getIndex() + { + return _index; + } + + public String getFactory() + { + return _factory; + } + + public boolean isMandatory() + { + return _mandatory; + } + } + + private static TypeMirror getErasure(ProcessingEnvironment processingEnv, final String className) + { + final Types typeUtils = processingEnv.getTypeUtils(); + final Elements elementUtils = processingEnv.getElementUtils(); + return typeUtils.erasure(elementUtils.getTypeElement(className).asType()); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeField.java ---------------------------------------------------------------------- diff --git a/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeField.java b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeField.java new file mode 100644 index 0000000..be2893e --- /dev/null +++ b/broker-codegen/src/main/java/org/apache/qpid/server/protocol/v1_0/CompositeTypeField.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol.v1_0; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.FIELD) +public @interface CompositeTypeField +{ + int index(); + boolean mandatory() default false; + String deserializationConverter() default ""; +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-codegen/src/main/resources/META-INF/services/javax.annotation.processing.Processor ---------------------------------------------------------------------- diff --git a/broker-codegen/src/main/resources/META-INF/services/javax.annotation.processing.Processor b/broker-codegen/src/main/resources/META-INF/services/javax.annotation.processing.Processor index c8ef012..8e295cf 100644 --- a/broker-codegen/src/main/resources/META-INF/services/javax.annotation.processing.Processor +++ b/broker-codegen/src/main/resources/META-INF/services/javax.annotation.processing.Processor @@ -25,3 +25,4 @@ org.apache.qpid.server.model.validation.AttributeFieldValidation org.apache.qpid.server.model.validation.ManagedAnnotationValidator org.apache.qpid.server.model.validation.OperationAnnotationValidator org.apache.qpid.server.model.validation.ContentHeaderAnnotationValidator +org.apache.qpid.server.protocol.v1_0.CompositeTypeConstructorGenerator http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java new file mode 100644 index 0000000..f76edf4 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeserializationFactories.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol.v1_0; + +import java.lang.reflect.Array; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.DistributionMode; +import org.apache.qpid.server.protocol.v1_0.type.ErrorCondition; +import org.apache.qpid.server.protocol.v1_0.type.LifetimePolicy; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.TxnCapability; +import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode; +import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionErrors; +import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; +import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError; +import org.apache.qpid.server.protocol.v1_0.type.transport.LinkError; +import org.apache.qpid.server.protocol.v1_0.type.transport.SessionError; + +public class DeserializationFactories +{ + @SuppressWarnings("unused") + public static Map<Symbol, Object> convertToNodeProperties(final Object value) throws AmqpErrorException + { + if (value != null) + { + if (!(value instanceof Map)) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Cannot construct 'node-properties' from type '%s'", + value.getClass().getSimpleName())); + } + Map<Symbol, Object> nodeProperties = new LinkedHashMap<>(); + Map<?, ?> map = (Map<?, ?>) value; + for (Map.Entry<?,?> entry : map.entrySet()) + { + Object key = entry.getKey(); + if (!(key instanceof Symbol)) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("'node-properties' must have only keys of type 'symbol' but got '%s'", + key.getClass().getSimpleName())); + } + if (Session_1_0.LIFETIME_POLICY.equals(key)) + { + final Object lifetimePolicy = entry.getValue(); + if (!(lifetimePolicy instanceof LifetimePolicy)) + { + String typeName = lifetimePolicy == null ? null : lifetimePolicy.getClass().getSimpleName(); + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Cannot construct 'lifetime-policy' from type '%s'", + typeName)); + } + nodeProperties.put((Symbol) key, lifetimePolicy); + } + else if (Symbol.valueOf("supported-dist-modes").equals(key)) + { + final Object distributionMode = entry.getValue(); + final DistributionMode[] converted; + if (distributionMode == null) + { + converted = null; + + } + else if (distributionMode.getClass().isArray()) + { + converted = new DistributionMode[Array.getLength(distributionMode)]; + for (int i = 0; i < converted.length; ++i) + { + final Object item = Array.get(distributionMode, i); + if (item == null) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + "'null' not allowed in 'supported-distribution-modes'"); + } + converted[i] = convertToDistributionMode(item); + } + } + else + { + converted = new DistributionMode[] {convertToDistributionMode(distributionMode)}; + } + nodeProperties.put((Symbol) key, converted); + } + else + { + nodeProperties.put((Symbol) key, entry.getValue()); + } + } + return nodeProperties; + } + else + { + return null; + } + } + + @SuppressWarnings("unused") + public static DistributionMode convertToDistributionMode(final Object value) throws AmqpErrorException + { + DistributionMode distributionMode = null; + if (value != null) + { + if (value instanceof DistributionMode) + { + distributionMode = (DistributionMode) value; + } + else if (value instanceof Symbol) + { + distributionMode = StdDistMode.valueOf(value); + if (distributionMode == null) + { + distributionMode = new UnknownDistributionMode((Symbol) value); + } + } + else + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Cannot construct 'distribution-mode' from type '%s'", + value.getClass().getSimpleName())); + } + } + return distributionMode; + } + + @SuppressWarnings("unused") + public static TxnCapability convertToTxnCapability(final Object value) throws AmqpErrorException + { + TxnCapability capability = null; + if (value != null) + { + if (value instanceof TxnCapability) + { + capability = (TxnCapability) value; + } + else if (value instanceof Symbol) + { + capability = org.apache.qpid.server.protocol.v1_0.type.transaction.TxnCapability.valueOf(value); + if (capability == null) + { + capability = new UnknownTxnCapability((Symbol) value); + } + } + else + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Cannot construct 'txn-capability' from type '%s'", + value.getClass().getSimpleName())); + } + } + return capability; + } + + @SuppressWarnings("unsued") + public static ErrorCondition convertToErrorCondition(final Object value) throws AmqpErrorException + { + ErrorCondition condition = null; + if (value != null) + { + if (value instanceof ErrorCondition) + { + condition = (ErrorCondition) value; + } + else if (value instanceof Symbol) + { + condition = AmqpError.valueOf(value); + if (condition == null) + { + condition = ConnectionError.valueOf(value); + if (condition == null) + { + condition = SessionError.valueOf(value); + if (condition == null) + { + condition = LinkError.valueOf(value); + if (condition == null) + { + condition = TransactionErrors.valueOf(value); + if (condition == null) + { + condition = new UnknownErrorCondition((Symbol) value); + } + } + } + } + } + } + else + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Cannot construct 'error-condition' from type '%s'", + value.getClass().getSimpleName())); + } + } + return condition; + } + + private static final class UnknownErrorCondition implements ErrorCondition + { + private final Symbol _value; + + public UnknownErrorCondition(final Symbol value) + { + _value = value; + } + + @Override + public Symbol getValue() + { + return _value; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final UnknownErrorCondition that = (UnknownErrorCondition) o; + + if (!_value.equals(that._value)) + { + return false; + } + + return true; + } + + @Override + public int hashCode() + { + return _value.hashCode(); + } + + @Override + public String toString() + { + return _value.toString(); + } + } + + private static class UnknownTxnCapability implements TxnCapability + { + private final Symbol _value; + + public UnknownTxnCapability(final Symbol value) + { + _value = value; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final UnknownTxnCapability that = (UnknownTxnCapability) o; + + return _value.equals(that._value); + } + + @Override + public int hashCode() + { + return _value.hashCode(); + } + + @Override + public String toString() + { + return _value.toString(); + } + } + + private static class UnknownDistributionMode implements DistributionMode + { + private final Symbol _value; + + public UnknownDistributionMode(final Symbol value) + { + _value = value; + } + + @Override + public boolean equals(final Object o) + { + if (this == o) + { + return true; + } + if (o == null || getClass() != o.getClass()) + { + return false; + } + + final UnknownDistributionMode that = (UnknownDistributionMode) o; + + return _value.equals(that._value); + } + + @Override + public int hashCode() + { + return _value.hashCode(); + } + + @Override + public String toString() + { + return _value.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 801810e..a78df1a 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -988,15 +988,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget @Override public void block(final Queue<?> queue) { - getAMQPConnection().doOnIOThreadAsync( - new Runnable() - { - @Override - public void run() - { - doBlock(queue); - } - }); + getAMQPConnection().doOnIOThreadAsync(() -> doBlock(queue)); } private void doBlock(final Queue<?> queue) @@ -1025,15 +1017,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget @Override public void unblock(final Queue<?> queue) { - getAMQPConnection().doOnIOThreadAsync( - new Runnable() - { - @Override - public void run() - { - doUnblock(queue); - } - }); + getAMQPConnection().doOnIOThreadAsync(() -> doUnblock(queue)); } private void doUnblock(final Queue<?> queue) @@ -1058,15 +1042,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget @Override public void block() { - getAMQPConnection().doOnIOThreadAsync( - new Runnable() - { - @Override - public void run() - { - doBlock(); - } - }); + getAMQPConnection().doOnIOThreadAsync(this::doBlock); } private void doBlock() @@ -1088,15 +1064,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget @Override public void unblock() { - getAMQPConnection().doOnIOThreadAsync( - new Runnable() - { - @Override - public void run() - { - doUnblock(); - } - }); + getAMQPConnection().doOnIOThreadAsync(this::doUnblock); } private void doUnblock() @@ -1388,38 +1356,32 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget @Override public void onSuccess(final T endpoint) { - doOnIOThreadAsync(new Runnable() - { - @Override - public void run() + doOnIOThreadAsync(() -> { + _associatedLinkEndpoints.add(endpoint); + endpoint.setLocalHandle(findNextAvailableOutputHandle()); + if (endpoint instanceof ErrantLinkEndpoint) { - _associatedLinkEndpoints.add(endpoint); - endpoint.setLocalHandle(findNextAvailableOutputHandle()); - if (endpoint instanceof ErrantLinkEndpoint) + endpoint.sendAttach(); + ((ErrantLinkEndpoint) endpoint).closeWithError(); + } + else + { + if (endpoint instanceof StandardReceivingLinkEndpoint + && (_blockingEntities.contains(Session_1_0.this) + || _blockingEntities.contains(((StandardReceivingLinkEndpoint) endpoint).getReceivingDestination()))) + { + endpoint.setStopped(true); + } + _inputHandleToEndpoint.put(_attach.getHandle(), endpoint); + if (!_endpointToOutputHandle.containsKey(endpoint)) { + _endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle()); endpoint.sendAttach(); - ((ErrantLinkEndpoint) endpoint).closeWithError(); + endpoint.start(); } else { - if (endpoint instanceof StandardReceivingLinkEndpoint - && (_blockingEntities.contains(Session_1_0.this) - || _blockingEntities.contains(((StandardReceivingLinkEndpoint) endpoint).getReceivingDestination()))) - { - endpoint.setStopped(true); - } - _inputHandleToEndpoint.put(_attach.getHandle(), endpoint); - if (!_endpointToOutputHandle.containsKey(endpoint)) - { - _endpointToOutputHandle.put(endpoint, endpoint.getLocalHandle()); - endpoint.sendAttach(); - endpoint.start(); - } - else - { - // TODO - link stealing??? - } - + // TODO - link stealing??? } } }); http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java new file mode 100644 index 0000000..c417b69 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractCompositeTypeConstructor.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol.v1_0.codec; + + +import java.lang.reflect.Array; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; +import org.apache.qpid.server.protocol.v1_0.type.transport.Error; + +public abstract class AbstractCompositeTypeConstructor<T> implements DescribedTypeConstructor<T> +{ + @Override + public TypeConstructor<T> construct(final Object descriptor, + final List<QpidByteBuffer> in, + final int[] originalPositions, + final ValueHandler valueHandler) throws AmqpErrorException + { + return new FieldValueReader(); + } + + protected abstract String getTypeName(); + + protected abstract T construct(FieldValueReader x) throws AmqpErrorException; + + protected class FieldValueReader implements TypeConstructor<T> + { + private List<QpidByteBuffer> _in; + private ValueHandler _valueHandler; + private int _count; + + @Override + public T construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + { + _in = in; + _valueHandler = handler; + return constructType(); + } + + private T constructType() throws AmqpErrorException + { + final TypeConstructor typeConstructor = _valueHandler.readConstructor(_in); + if (typeConstructor instanceof ListConstructor) + { + ListConstructor listConstructor = (ListConstructor) typeConstructor; + int size; + long remaining = QpidByteBufferUtils.remaining(_in); + if (remaining < listConstructor.getSize()) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Not sufficient data for deserialization of '%s'." + + " Expected at least %d bytes. Got %d bytes.", + getTypeName(), + listConstructor.getSize(), + remaining)); + } + + if (listConstructor.getSize() == 1) + { + size = QpidByteBufferUtils.get(_in) & 0xFF; + _count = QpidByteBufferUtils.get(_in) & 0xFF; + } + else + { + size = QpidByteBufferUtils.getInt(_in); + _count = QpidByteBufferUtils.getInt(_in); + } + + remaining -= listConstructor.getSize(); + if (remaining < size) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Not sufficient data for deserialization of '%s'." + + " Expected at least %d bytes. Got %d bytes.", + getTypeName(), + size, + remaining)); + } + } + else if (typeConstructor instanceof ZeroListConstructor) + { + _count = 0; + } + else + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Unexpected format when deserializing of '%s'", + getTypeName())); + } + return AbstractCompositeTypeConstructor.this.construct(this); + } + + + public <F> F readValue(final int fieldIndex, + final String fieldName, + final boolean mandatory, + final Class<F> expectedType) throws AmqpErrorException + { + if (fieldIndex >= _count) + { + if (mandatory) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Mandatory field '%s' of '%s' was not provided", + fieldName, + getTypeName())); + } + return null; + } + + Object value = _valueHandler.parse(_in); + + + if (value == null && mandatory) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Mandatory field '%s' of '%s' was not provided", + fieldName, + getTypeName())); + } + + if (value != null && !expectedType.isAssignableFrom(value.getClass())) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format( + "Wrong type for field '%s' of '%s'. Expected '%s' but got '%s'.", + fieldName, + getTypeName(), + expectedType.getSimpleName(), + value.getClass().getSimpleName())); + } + + return (F) value; + } + + public <K, V> Map<K, V> readMapValue(final int fieldIndex, + final String fieldName, + final boolean mandatory, + final Class<K> expectedKeyType, + final Class<V> expectedValueType) + throws AmqpErrorException + { + if (fieldIndex >= _count) + { + if (mandatory) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Mandatory field '%s' of '%s' was not provided", + fieldName, + getTypeName())); + } + return null; + } + + TypeConstructor typeConstructor = _valueHandler.readConstructor(_in); + if (typeConstructor instanceof MapConstructor) + { + MapConstructor mapConstructor = ((MapConstructor) typeConstructor); + + return mapConstructor.construct(_in, + _valueHandler, + expectedKeyType, + expectedValueType); + } + else if (typeConstructor instanceof NullTypeConstructor) + { + if (mandatory) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + "Mandatory field '%s' of '%s' was not provided", + fieldName, + getTypeName()); + } + } + else + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Could not decode value field '%s' of '%s'", + fieldName, + getTypeName())); + } + + return null; + } + + public <F> F[] readArrayValue(final int fieldIndex, + final String fieldName, + final boolean mandatory, + final Class<F> expectedType, + final Converter<F> converter) throws AmqpErrorException + { + if (fieldIndex >= _count) + { + if (mandatory) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Mandatory field '%s' of '%s' was not provided", + fieldName, + getTypeName())); + } + return null; + } + + Object value = _valueHandler.parse(_in); + + if (mandatory && value == null) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Mandatory field '%s' of '%s' was not provided", + fieldName, + getTypeName())); + } + + if (value != null) + { + if (value.getClass().isArray()) + { + if (expectedType.isAssignableFrom(value.getClass().getComponentType())) + { + return (F[]) value; + } + else + { + final Object[] objects = (Object[]) value; + F[] array = (F[]) Array.newInstance(expectedType, objects.length); + try + { + for (int i = 0; i < objects.length; ++i) + { + array[i] = converter.convert(objects[i]); + } + } + catch (RuntimeException e) + { + Error error = new Error(AmqpError.DECODE_ERROR, + String.format("Could not decode value field '%s' of '%s'", fieldName, getTypeName())); + throw new AmqpErrorException(error, e); + } + return array; + } + } + else if (expectedType.isAssignableFrom(value.getClass())) + { + F[] array = (F[]) Array.newInstance(expectedType, 1); + array[0] = (F) value; + return array; + } + else + { + try + { + final F convertedValue = converter.convert(value); + F[] array = (F[]) Array.newInstance(expectedType, 1); + array[0] = convertedValue; + return array; + } + catch (RuntimeException e) + { + Error error = new Error(AmqpError.DECODE_ERROR, + String.format("Could not decode value field '%s' of '%s'", fieldName, getTypeName())); + throw new AmqpErrorException(error, e); + } + } + } + + return null; + } + } + + public interface Converter<T> + { + T convert(Object o) throws AmqpErrorException; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java index e8a14ff..c8da269 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/AbstractDescribedTypeConstructor.java @@ -20,43 +20,29 @@ */ package org.apache.qpid.server.protocol.v1_0.codec; -import java.lang.annotation.Annotation; -import java.lang.reflect.Field; -import java.util.ArrayList; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; -import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField; -import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; public abstract class AbstractDescribedTypeConstructor<T extends Object> implements DescribedTypeConstructor<T> { - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDescribedTypeConstructor.class); - @Override public TypeConstructor<T> construct(final Object descriptor, final List<QpidByteBuffer> in, final int[] originalPositions, final ValueHandler valueHandler) throws AmqpErrorException { - return new TypeConstructorFromUnderlying<>(this, valueHandler.readConstructor(in)); } - protected abstract T construct(Object underlying); + protected abstract T construct(Object underlying) throws AmqpErrorException; private static class TypeConstructorFromUnderlying<S extends Object> implements TypeConstructor<S> { private final TypeConstructor _describedConstructor; private AbstractDescribedTypeConstructor<S> _describedTypeConstructor; - private static final Map<Class<?>, CompositeTypeValidator> _validators = new ConcurrentHashMap<>(); public TypeConstructorFromUnderlying(final AbstractDescribedTypeConstructor<S> describedTypeConstructor, final TypeConstructor describedConstructor) @@ -68,57 +54,7 @@ public abstract class AbstractDescribedTypeConstructor<T extends Object> impleme @Override public S construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException { - final S constructedObject = - _describedTypeConstructor.construct(_describedConstructor.construct(in, handler)); - CompositeTypeValidator<S> validator = - _validators.computeIfAbsent(constructedObject.getClass(), k -> createValidator(constructedObject)); - validator.validate(constructedObject); - return constructedObject; - } - - private CompositeTypeValidator<S> createValidator(final S constructedObject) - { - final List<Field> mandatoryFields = new ArrayList<>(); - for (Field field : constructedObject.getClass().getDeclaredFields()) - { - Annotation[] annotations = field.getDeclaredAnnotationsByType(CompositeTypeField.class); - for (Annotation annotation : annotations) - { - if (annotation instanceof CompositeTypeField && ((CompositeTypeField) annotation).mandatory()) - { - field.setAccessible(true); - mandatoryFields.add(field); - } - } - } - return objectToValidate -> - { - try - { - if (!mandatoryFields.isEmpty()) - { - for (Field field : mandatoryFields) - { - if (field.get(objectToValidate) == null) - { - throw new AmqpErrorException(AmqpError.DECODE_ERROR, - String.format("Missing mandatory field '%s'.", - field.getName())); - } - } - } - } - catch (IllegalAccessException e) - { - LOGGER.error(String.format("Error validating AMQP 1.0 object '%s'", constructedObject.toString()), e); - throw new AmqpErrorException(AmqpError.INTERNAL_ERROR, "Failure during object validation"); - } - }; - } - - private interface CompositeTypeValidator<S> - { - void validate(final S constructedObject) throws AmqpErrorException; + return _describedTypeConstructor.construct(_describedConstructor.construct(in, handler)); } } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java deleted file mode 100644 index 8d54691..0000000 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/CompoundTypeConstructor.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.protocol.v1_0.codec; - -import java.util.ArrayList; -import java.util.Formatter; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.qpid.server.bytebuffer.QpidByteBuffer; -import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; -import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; -import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; -import org.apache.qpid.server.protocol.v1_0.type.transport.Error; - -public class CompoundTypeConstructor<T> extends VariableWidthTypeConstructor<T> -{ - private final CompoundTypeAssembler.Factory<T> _assemblerFactory; - - public static final CompoundTypeAssembler.Factory LIST_ASSEMBLER_FACTORY = - new CompoundTypeAssembler.Factory() - { - - @Override - public CompoundTypeAssembler newInstance() - { - return new ListAssembler(); - } - }; - - - - private static class ListAssembler implements CompoundTypeAssembler<List> - { - private List _list; - - @Override - public void init(final int count) throws AmqpErrorException - { - _list = new ArrayList(count); - } - - @Override - public void addItem(final Object obj) throws AmqpErrorException - { - _list.add(obj); - } - - @Override - public List complete() throws AmqpErrorException - { - return _list; - } - - @Override - public String toString() - { - return "ListAssembler{" + - "_list=" + _list + - '}'; - } - } - - - public static final CompoundTypeAssembler.Factory MAP_ASSEMBLER_FACTORY = - new CompoundTypeAssembler.Factory<Map>() - { - - @Override - public CompoundTypeAssembler<Map> newInstance() - { - return new MapAssembler(); - } - }; - - private static class MapAssembler implements CompoundTypeAssembler<Map> - { - private Map _map; - private Object _lastKey; - private static final Object NOT_A_KEY = new Object(); - - - @Override - public void init(final int count) throws AmqpErrorException - { - // Can't have an odd number of elements in a map - if((count & 0x1) == 1) - { - Error error = new Error(); - error.setCondition(AmqpError.DECODE_ERROR); - Formatter formatter = new Formatter(); - formatter.format("map cannot have odd number of elements: %d", count); - error.setDescription(formatter.toString()); - throw new AmqpErrorException(error); - } - _map = new HashMap(count); - _lastKey = NOT_A_KEY; - } - - @Override - public void addItem(final Object obj) throws AmqpErrorException - { - if(_lastKey != NOT_A_KEY) - { - if(_map.put(_lastKey, obj) != null) - { - Error error = new Error(); - error.setCondition(AmqpError.DECODE_ERROR); - Formatter formatter = new Formatter(); - formatter.format("map cannot have duplicate keys: %s has values (%s, %s)", _lastKey, _map.get(_lastKey), obj); - error.setDescription(formatter.toString()); - - throw new AmqpErrorException(error); - } - _lastKey = NOT_A_KEY; - } - else - { - _lastKey = obj; - } - - } - - @Override - public Map complete() throws AmqpErrorException - { - return _map; - } - } - - - public static <X> CompoundTypeConstructor<X> getInstance(int i, - CompoundTypeAssembler.Factory<X> assemblerFactory) - { - return new CompoundTypeConstructor<>(i, assemblerFactory); - } - - - private CompoundTypeConstructor(int size, - final CompoundTypeAssembler.Factory<T> assemblerFactory) - { - super(size); - _assemblerFactory = assemblerFactory; - } - - @Override - public T construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException - { - int size; - int count; - - if(getSize() == 1) - { - size = QpidByteBufferUtils.get(in) & 0xFF; - count = QpidByteBufferUtils.get(in) & 0xFF; - } - else - { - size = QpidByteBufferUtils.getInt(in); - count = QpidByteBufferUtils.getInt(in); - } - - CompoundTypeAssembler<T> assembler = _assemblerFactory.newInstance(); - - assembler.init(count); - - for(int i = 0; i < count; i++) - { - assembler.addItem(handler.parse(in)); - } - - return assembler.complete(); - } -} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ListConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ListConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ListConstructor.java new file mode 100644 index 0000000..87011d2 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/ListConstructor.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol.v1_0.codec; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; + +public class ListConstructor extends VariableWidthTypeConstructor<List> +{ + private ListConstructor(final int size) + { + super(size); + } + + @Override + public List construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + { + int size; + int count; + long remaining = QpidByteBufferUtils.remaining(in); + if (remaining < getSize()) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Not sufficient data for deserialization of 'list'." + + " Expected at least %d bytes. Got %d bytes.", + getSize(), + remaining)); + } + + if(getSize() == 1) + { + size = QpidByteBufferUtils.get(in) & 0xFF; + count = QpidByteBufferUtils.get(in) & 0xFF; + } + else + { + size = QpidByteBufferUtils.getInt(in); + count = QpidByteBufferUtils.getInt(in); + } + remaining -= getSize(); + if (remaining < size) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Not sufficient data for deserialization of 'list'." + + " Expected at least %d bytes. Got %d bytes.", + size, + remaining)); + } + return construct(in, handler, size, count); + } + + protected List construct(final List<QpidByteBuffer> in, + final ValueHandler handler, + final int size, + final int count) + throws AmqpErrorException + { + List list = new ArrayList(count); + + for(int i = 0; i < count; i++) + { + list.add(handler.parse(in)); + } + + return list; + } + + public static TypeConstructor getInstance(final int size) + { + return new ListConstructor(size); + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/6a267175/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java new file mode 100644 index 0000000..4c791e8 --- /dev/null +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/codec/MapConstructor.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol.v1_0.codec; + +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.bytebuffer.QpidByteBufferUtils; +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; + +public class MapConstructor extends VariableWidthTypeConstructor<Map> +{ + private MapConstructor(final int size) + { + super(size); + } + + @Override + public Map construct(final List<QpidByteBuffer> in, final ValueHandler handler) throws AmqpErrorException + { + return construct(in, handler, Object.class, Object.class); + } + + public <T,S> Map<T, S> construct(final List<QpidByteBuffer> in, final ValueHandler handler, + Class<T> keyType, + Class<S> valueType) throws AmqpErrorException + { + int size; + int count; + + long remaining = QpidByteBufferUtils.remaining(in); + if (remaining < getSize()) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Not sufficient data for deserialization of 'map'." + + " Expected at least %d bytes. Got %d bytes.", + getSize(), + remaining)); + } + + if(getSize() == 1) + { + size = QpidByteBufferUtils.get(in) & 0xFF; + count = QpidByteBufferUtils.get(in) & 0xFF; + } + else + { + size = QpidByteBufferUtils.getInt(in); + count = QpidByteBufferUtils.getInt(in); + } + remaining -= getSize(); + if (remaining < size) + { + throw new AmqpErrorException(AmqpError.DECODE_ERROR, + String.format("Not sufficient data for deserialization of 'map'." + + " Expected at least %d bytes. Got %d bytes.", + size, + remaining)); + } + + return construct(in, handler, size, count, keyType, valueType); + } + + + private <T, S> Map<T,S> construct(final List<QpidByteBuffer> in, + final ValueHandler handler, + final int size, + final int count, + Class<T> keyType, + Class<S> valueType) + throws AmqpErrorException + { + + // Can't have an odd number of elements in a map + if ((count & 0x1) == 1) + { + String message = String.format("Map cannot have odd number of elements: %d", count); + throw new AmqpErrorException(AmqpError.DECODE_ERROR, message); + } + + Map<T, S> map = new LinkedHashMap<>(count); + + final int mapSize = count / 2; + for(int i = 0; i < mapSize; i++) + { + Object key = handler.parse(in); + if (key != null && !keyType.isAssignableFrom(key.getClass())) + { + String message = String.format("Expected key type is '%s' but got '%s'", + keyType.getSimpleName(), + key.getClass().getSimpleName()); + throw new AmqpErrorException(AmqpError.DECODE_ERROR, message); + } + + Object value = handler.parse(in); + if (value != null && !valueType.isAssignableFrom(value.getClass())) + { + String message = String.format("Expected key type is '%s' but got '%s'", + valueType.getSimpleName(), + value.getClass().getSimpleName()); + throw new AmqpErrorException(AmqpError.DECODE_ERROR, message); + } + + Object oldValue; + if ((oldValue = map.put((T)key, (S)value)) != null) + { + String message = String.format("Map cannot have duplicate keys: %s has values (%s, %s)", + key, + oldValue, + value); + throw new AmqpErrorException(AmqpError.DECODE_ERROR, message); + } + + } + return map; + } + + + public static MapConstructor getInstance(int size) + { + return new MapConstructor(size); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org