http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/514bac75/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Receiver.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Receiver.cpp b/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Receiver.cpp new file mode 100644 index 0000000..19a40ed --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Receiver.cpp @@ -0,0 +1,396 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpidit/jms_hdrs_props_test/Receiver.hpp" + +#include <iostream> +#include <json/json.h> +#include "proton/connection.hpp" +#include "proton/default_container.hpp" +#include "proton/delivery.hpp" +#include "proton/transport.hpp" +#include "qpidit/QpidItErrors.hpp" + +namespace qpidit +{ + namespace jms_hdrs_props_test + { + Receiver::Receiver(const std::string& brokerUrl, + const std::string& jmsMessageType, + const Json::Value& testNumberMap, + const Json::Value& flagMap): + _brokerUrl(brokerUrl), + _jmsMessageType(jmsMessageType), + _testNumberMap(testNumberMap), + _flagMap(flagMap), + _subTypeList(testNumberMap.getMemberNames()), + _subTypeIndex(0), + _expected(getTotalNumExpectedMsgs(testNumberMap)), + _received(0UL), + _receivedSubTypeList(Json::arrayValue), + _receivedValueMap(Json::objectValue), + _receivedHeadersMap(Json::objectValue), + _receivedPropertiesMap(Json::objectValue) + {} + + Receiver::~Receiver() {} + + Json::Value& Receiver::getReceivedValueMap() { + return _receivedValueMap; + } + + Json::Value& Receiver::getReceivedHeadersMap() { + return _receivedHeadersMap; + } + + Json::Value& Receiver::getReceivedPropertiesMap() { + return _receivedPropertiesMap; + } + + void Receiver::on_container_start(proton::container &c) { + c.open_receiver(_brokerUrl); + } + + void Receiver::on_message(proton::delivery &d, proton::message &m) { + try { + if (_received < _expected) { + int8_t t = qpidit::JMS_MESSAGE_TYPE; + try {t = m.message_annotations().get(proton::symbol("x-opt-jms-msg-type")).get<int8_t>();} + catch (const std::exception& e) { + std::cout << "JmsReceiver::on_message(): Missing annotation \"x-opt-jms-msg-type\"" << std::endl; + throw; + } + switch (t) { + case qpidit::JMS_MESSAGE_TYPE: + receiveJmsMessage(m); + break; + case qpidit::JMS_OBJECTMESSAGE_TYPE: + receiveJmsObjectMessage(m); + break; + case qpidit::JMS_MAPMESSAGE_TYPE: + receiveJmsMapMessage(m); + break; + case qpidit::JMS_BYTESMESSAGE_TYPE: + receiveJmsBytesMessage(m); + break; + case qpidit::JMS_STREAMMESSAGE_TYPE: + receiveJmsStreamMessage(m); + break; + case qpidit::JMS_TEXTMESSAGE_TYPE: + receiveJmsTextMessage(m); + break; + default:; + // TODO: handle error - unknown JMS message type + } + + processMessageHeaders(m); + processMessageProperties(m); + + std::string subType(_subTypeList[_subTypeIndex]); + // Increment the subtype if the required number of messages have been received + if (_receivedSubTypeList.size() >= _testNumberMap[subType].asInt() && + _subTypeIndex < _testNumberMap.size()) { + _receivedValueMap[subType] = _receivedSubTypeList; + _receivedSubTypeList.clear(); + ++_subTypeIndex; + } + _received++; + if (_received >= _expected) { + d.receiver().close(); + d.connection().close(); + } + } + } catch (const std::exception&) { + d.receiver().close(); + d.connection().close(); + throw; + } + } + + //static + uint32_t Receiver::getTotalNumExpectedMsgs(const Json::Value testNumberMap) { + uint32_t total(0UL); + for (Json::Value::const_iterator i=testNumberMap.begin(); i!=testNumberMap.end(); ++i) { + total += (*i).asUInt(); + } + return total; + + } + + // protected + + void Receiver::receiveJmsMessage(const proton::message& msg) { + _receivedSubTypeList.append(Json::Value()); + } + + void Receiver::receiveJmsObjectMessage(const proton::message& msg) { + // TODO + } + + void Receiver::receiveJmsMapMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_MAPMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_MAPMESSAGE_TYPE"); + } + std::string subType(_subTypeList[_subTypeIndex]); + std::map<std::string, proton::value> m; + msg.body().get(m); + for (std::map<std::string, proton::value>::const_iterator i=m.begin(); i!=m.end(); ++i) { + std::string key = i->first; + if (subType.compare(key.substr(0, key.size()-3)) != 0) { + throw qpidit::IncorrectJmsMapKeyPrefixError(subType, key); + } + proton::value val = i->second; + if (subType.compare("boolean") == 0) { + _receivedSubTypeList.append(val.get<bool>() ? Json::Value("True") : Json::Value("False")); + } else if (subType.compare("byte") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(val.get<int8_t>()))); + } else if (subType.compare("bytes") == 0) { + _receivedSubTypeList.append(Json::Value(std::string(val.get<proton::binary>()))); + } else if (subType.compare("char") == 0) { + std::ostringstream oss; + oss << (char)val.get<wchar_t>(); + _receivedSubTypeList.append(Json::Value(oss.str())); + } else if (subType.compare("double") == 0) { + double d = val.get<double>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false))); + } else if (subType.compare("float") == 0) { + float f = val.get<float>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false))); + } else if (subType.compare("int") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val.get<int32_t>()))); + } else if (subType.compare("long") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val.get<int64_t>()))); + } else if (subType.compare("short") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(val.get<int16_t>()))); + } else if (subType.compare("string") == 0) { + _receivedSubTypeList.append(Json::Value(val.get<std::string>())); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + } + } + + void Receiver::receiveJmsBytesMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_BYTESMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_BYTESMESSAGE_TYPE"); + } + std::string subType(_subTypeList[_subTypeIndex]); + proton::binary body = msg.body().get<proton::binary>(); + if (subType.compare("boolean") == 0) { + if (body.size() != 1) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=boolean", 1, body.size()); + _receivedSubTypeList.append(body[0] ? Json::Value("True") : Json::Value("False")); + } else if (subType.compare("byte") == 0) { + if (body.size() != sizeof(int8_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=byte", sizeof(int8_t), body.size()); + int8_t val = *((int8_t*)body.data()); + _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(val))); + } else if (subType.compare("bytes") == 0) { + _receivedSubTypeList.append(Json::Value(std::string(body))); + } else if (subType.compare("char") == 0) { + if (body.size() != sizeof(uint16_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=char", sizeof(uint16_t), body.size()); + // TODO: This is ugly: ignoring first byte - handle UTF-16 correctly + char c = body[1]; + std::ostringstream oss; + oss << c; + _receivedSubTypeList.append(Json::Value(oss.str())); + } else if (subType.compare("double") == 0) { + if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=double", sizeof(int64_t), body.size()); + int64_t val = be64toh(*((int64_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val, true, false))); + } else if (subType.compare("float") == 0) { + if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=float", sizeof(int32_t), body.size()); + int32_t val = be32toh(*((int32_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val, true, false))); + } else if (subType.compare("long") == 0) { + if (body.size() != sizeof(int64_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=long", sizeof(int64_t), body.size()); + int64_t val = be64toh(*((int64_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(val))); + } else if (subType.compare("int") == 0) { + if (body.size() != sizeof(int32_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=int", sizeof(int32_t), body.size()); + int32_t val = be32toh(*((int32_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(val))); + } else if (subType.compare("short") == 0) { + if (body.size() != sizeof(int16_t)) throw IncorrectMessageBodyLengthError("JmsReceiver::receiveJmsBytesMessage, subType=short", sizeof(int16_t), body.size()); + int16_t val = be16toh(*((int16_t*)body.data())); + _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(val))); + } else if (subType.compare("string") == 0) { + // TODO: decode string size in first two bytes and check string size + _receivedSubTypeList.append(Json::Value(std::string(body).substr(2))); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + } + + void Receiver::receiveJmsStreamMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_STREAMMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_STREAMMESSAGE_TYPE"); + } + std::string subType(_subTypeList[_subTypeIndex]); + std::vector<proton::value> l; + msg.body().get(l); + for (std::vector<proton::value>::const_iterator i=l.begin(); i!=l.end(); ++i) { + if (subType.compare("boolean") == 0) { + _receivedSubTypeList.append(i->get<bool>() ? Json::Value("True") : Json::Value("False")); + } else if (subType.compare("byte") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int8_t>(i->get<int8_t>()))); + } else if (subType.compare("bytes") == 0) { + _receivedSubTypeList.append(Json::Value(std::string(i->get<proton::binary>()))); + } else if (subType.compare("char") == 0) { + std::ostringstream oss; + oss << (char)i->get<wchar_t>(); + _receivedSubTypeList.append(Json::Value(oss.str())); + } else if (subType.compare("double") == 0) { + double d = i->get<double>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(*((int64_t*)&d), true, false))); + } else if (subType.compare("float") == 0) { + float f = i->get<float>(); + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(*((int32_t*)&f), true, false))); + } else if (subType.compare("int") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int32_t>(i->get<int32_t>()))); + } else if (subType.compare("long") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int64_t>(i->get<int64_t>()))); + } else if (subType.compare("short") == 0) { + _receivedSubTypeList.append(Json::Value(toHexStr<int16_t>(i->get<int16_t>()))); + } else if (subType.compare("string") == 0) { + _receivedSubTypeList.append(Json::Value(i->get<std::string>())); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + } + + } + + void Receiver::receiveJmsTextMessage(const proton::message& msg) { + if(_jmsMessageType.compare("JMS_TEXTMESSAGE_TYPE") != 0) { + throw qpidit::IncorrectMessageBodyTypeError(_jmsMessageType, "JMS_TEXTMESSAGE_TYPE"); + } + _receivedSubTypeList.append(Json::Value(msg.body().get<std::string>())); + } + + void Receiver::processMessageHeaders(const proton::message& msg) { + addMessageHeaderString("JMS_TYPE_HEADER", msg.subject()); + if (_flagMap.isMember("JMS_CORRELATIONID_AS_BYTES") && _flagMap["JMS_CORRELATIONID_AS_BYTES"].asBool()) { + addMessageHeaderByteArray("JMS_CORRELATIONID_HEADER", proton::get<proton::binary>(msg.correlation_id())); + } else { + try { + addMessageHeaderString("JMS_CORRELATIONID_HEADER", proton::get<std::string>(msg.correlation_id())); + } catch (const std::exception& e) {} // TODO: UGLY, how do you check if there _is_ a correlation id? + } + + std::string reply_to = msg.reply_to(); + // Some brokers prepend 'queue://' and 'topic://' to reply_to addresses, strip these when present + if (_flagMap.isMember("JMS_REPLYTO_AS_TOPIC") && _flagMap["JMS_REPLYTO_AS_TOPIC"].asBool()) { + if (reply_to.find("topic://") == 0) { + addMessageHeaderDestination("JMS_REPLYTO_HEADER", qpidit::JMS_TOPIC, reply_to.substr(8)); + } else { + addMessageHeaderDestination("JMS_REPLYTO_HEADER", qpidit::JMS_TOPIC, reply_to); + } + } else { + if (reply_to.find("queue://") == 0) { + addMessageHeaderDestination("JMS_REPLYTO_HEADER", qpidit::JMS_QUEUE, reply_to.substr(8)); + } else { + addMessageHeaderDestination("JMS_REPLYTO_HEADER", qpidit::JMS_QUEUE, reply_to); + } + } + } + + void Receiver::addMessageHeaderString(const char* headerName, const std::string& value) { + if (!value.empty()) { // TODO: Remove this test when PROTON-1288 is fixed as empty strings are allowed in headers + Json::Value valueMap(Json::objectValue); + valueMap["string"] = value; + _receivedHeadersMap[headerName] = valueMap; + } + } + + void Receiver::addMessageHeaderByteArray(const std::string& headerName, const proton::binary ba) { + if (!ba.empty()) { // TODO: Remove this test when PROTON-1288 is fixed as empty binaries are allowed in headers + Json::Value valueMap(Json::objectValue); + valueMap["bytes"] = std::string(ba); + _receivedHeadersMap[headerName] = valueMap; + } + } + + void Receiver::addMessageHeaderDestination(const std::string& headerName, qpidit::jmsDestinationType_t dt, const std::string& d) { + if (!d.empty()) { + Json::Value valueMap(Json::objectValue); + switch (dt) { + case qpidit::JMS_QUEUE: + valueMap["queue"] = d; + break; + case qpidit::JMS_TOPIC: + valueMap["topic"] = d; + break; + default: + ; // TODO: Handle error: remaining JMS destinations not handled. + } + _receivedHeadersMap[headerName] = valueMap; + } + } + + void Receiver::processMessageProperties(const proton::message& msg) { + // TODO: Add this function when PROTON-1284 is fixed +// std::map<proton::value, proton::value> props; +// msg.properties().value() >> props; + } + + } /* namespace jms_hdrs_props_test */ +} /* namespace qpidit */ + +/* --- main --- + * Args: 1: Broker address (ip-addr:port) + * 2: Queue name + * 3: JMS message type + * 4: JSON Test parameters containing 2 maps: [testValuesMap, flagMap] + */ +int main(int argc, char** argv) { + /* + for (int i=0; i<argc; ++i) { + std::cout << "*** argv[" << i << "] : " << argv[i] << std::endl; + } + */ + // TODO: improve arg management a little... + if (argc != 5) { + throw qpidit::ArgumentError("Incorrect number of arguments"); + } + + std::ostringstream oss; + oss << argv[1] << "/" << argv[2]; + + try { + Json::Value testParams; + Json::Reader jsonReader; + if (not jsonReader.parse(argv[4], testParams, false)) { + throw qpidit::JsonParserError(jsonReader); + } + + qpidit::jms_hdrs_props_test::Receiver receiver(oss.str(), argv[3], testParams[0], testParams[1]); + proton::default_container(receiver).run(); + + Json::FastWriter fw; + std::cout << argv[3] << std::endl; + Json::Value returnList(Json::arrayValue); + returnList.append(receiver.getReceivedValueMap()); + returnList.append(receiver.getReceivedHeadersMap()); + returnList.append(receiver.getReceivedPropertiesMap()); + std::cout << fw.write(returnList); + } catch (const std::exception& e) { + std::cout << "JmsReceiver error: " << e.what() << std::endl; + } +}
http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/514bac75/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Receiver.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Receiver.hpp b/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Receiver.hpp new file mode 100644 index 0000000..36db58d --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Receiver.hpp @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef SRC_QPIDIT_JMS_HEADERS_PROPERTIES_TEST_RECEIVER_HPP_ +#define SRC_QPIDIT_JMS_HEADERS_PROPERTIES_TEST_RECEIVER_HPP_ + +#include <iomanip> +#include <json/value.h> +#include "proton/messaging_handler.hpp" +#include "proton/types.hpp" +#include "qpidit/JmsTestBase.hpp" +#include <sstream> + +namespace qpidit +{ + namespace jms_hdrs_props_test + { + + class Receiver : public qpidit::JmsTestBase + { + protected: + const std::string _brokerUrl; + const std::string _jmsMessageType; + const Json::Value _testNumberMap; + const Json::Value _flagMap; + Json::Value::Members _subTypeList; + int _subTypeIndex; + uint32_t _expected; + uint32_t _received; + Json::Value _receivedSubTypeList; + Json::Value _receivedValueMap; + Json::Value _receivedHeadersMap; + Json::Value _receivedPropertiesMap; + public: + Receiver(const std::string& brokerUrl, + const std::string& jmsMessageType, + const Json::Value& testNumberMap, + const Json::Value& flagMap); + virtual ~Receiver(); + Json::Value& getReceivedValueMap(); + Json::Value& getReceivedHeadersMap(); + Json::Value& getReceivedPropertiesMap(); + void on_container_start(proton::container &c); + void on_message(proton::delivery &d, proton::message &m); + + static uint32_t getTotalNumExpectedMsgs(const Json::Value testNumberMap); + + protected: + void receiveJmsMessage(const proton::message& msg); + void receiveJmsObjectMessage(const proton::message& msg); + void receiveJmsMapMessage(const proton::message& msg); + void receiveJmsBytesMessage(const proton::message& msg); + void receiveJmsStreamMessage(const proton::message& msg); + void receiveJmsTextMessage(const proton::message& msg); + + void processMessageHeaders(const proton::message& msg); + void addMessageHeaderString(const char* headerName, const std::string& value); + void addMessageHeaderByteArray(const std::string& headerName, const proton::binary ba); + void addMessageHeaderDestination(const std::string& headerName, qpidit::jmsDestinationType_t dt, const std::string& d); + void processMessageProperties(const proton::message& msg); + + // Format signed numbers in negative hex format if signedFlag is true, ie -0xNNNN, positive numbers in 0xNNNN format + template<typename T> static std::string toHexStr(T val, bool fillFlag = false, bool signedFlag = true) { + std::ostringstream oss; + bool neg = false; + if (signedFlag) { + neg = val < 0; + if (neg) val = -val; + } + oss << (neg ? "-" : "") << "0x" << std::hex; + if (fillFlag) { + oss << std::setw(sizeof(T)*2) << std::setfill('0'); + } + oss << (sizeof(T) == 1 ? (int)val & 0xff : sizeof(T) == 2 ? val & 0xffff : sizeof(T) == 4 ? val & 0xffffffff : val); + return oss.str(); + } + }; + + } /* namespace jms_hdrs_props_test */ +} /* namespace qpidit */ + +#endif /* SRC_QPIDIT_JMS_HEADERS_PROPERTIES_TEST_RECEIVER_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/514bac75/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Sender.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Sender.cpp b/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Sender.cpp new file mode 100644 index 0000000..e3ff0e6 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Sender.cpp @@ -0,0 +1,451 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpidit/jms_hdrs_props_test/Sender.hpp" + +#include <cerrno> +#include <iomanip> +#include <iostream> +#include <json/json.h> +#include "proton/connection.hpp" +#include "proton/default_container.hpp" +#include "proton/tracker.hpp" +#include "proton/transport.hpp" +#include <stdio.h> + +namespace qpidit +{ + namespace jms_hdrs_props_test + { + Sender::Sender(const std::string& brokerUrl, + const std::string& jmsMessageType, + const Json::Value& testParams) : + _brokerUrl(brokerUrl), + _jmsMessageType(jmsMessageType), + _testValueMap(testParams[0]), + _testHeadersMap(testParams[1]), + _testPropertiesMap(testParams[2]), + _msgsSent(0), + _msgsConfirmed(0), + _totalMsgs(getTotalNumMessages(_testValueMap)) + { + if (_testValueMap.type() != Json::objectValue) { + throw qpidit::InvalidJsonRootNodeError(Json::objectValue, _testValueMap.type()); + } + } + + Sender::~Sender() {} + + void Sender::on_container_start(proton::container &c) { + c.open_sender(_brokerUrl); + } + + void Sender::on_sendable(proton::sender &s) { + if (_totalMsgs == 0) { + s.connection().close(); + } else if (_msgsSent == 0) { + Json::Value::Members subTypes = _testValueMap.getMemberNames(); + std::sort(subTypes.begin(), subTypes.end()); + for (std::vector<std::string>::const_iterator i=subTypes.begin(); i!=subTypes.end(); ++i) { + sendMessages(s, *i, _testValueMap[*i]); + } + } + } + + void Sender::on_tracker_accept(proton::tracker &t) { + _msgsConfirmed++; + if (_msgsConfirmed == _totalMsgs) { + t.connection().close(); + } + } + + void Sender::on_transport_close(proton::transport &t) { + _msgsSent = _msgsConfirmed; + } + + // protected + + void Sender::sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValues) { + uint32_t valueNumber = 0; + for (Json::Value::const_iterator i=testValues.begin(); i!=testValues.end(); ++i) { + if (s.credit()) { + proton::message msg; + if (_jmsMessageType.compare("JMS_MESSAGE_TYPE") == 0) { + setMessage(msg, subType, (*i).asString()); + } else if (_jmsMessageType.compare("JMS_BYTESMESSAGE_TYPE") == 0) { + setBytesMessage(msg, subType, (*i).asString()); + } else if (_jmsMessageType.compare("JMS_MAPMESSAGE_TYPE") == 0) { + setMapMessage(msg, subType, (*i).asString(), valueNumber); + } else if (_jmsMessageType.compare("JMS_OBJECTMESSAGE_TYPE") == 0) { + setObjectMessage(msg, subType, *i); + } else if (_jmsMessageType.compare("JMS_STREAMMESSAGE_TYPE") == 0) { + setStreamMessage(msg, subType, (*i).asString()); + } else if (_jmsMessageType.compare("JMS_TEXTMESSAGE_TYPE") == 0) { + setTextMessage(msg, *i); + } else { + throw qpidit::UnknownJmsMessageTypeError(_jmsMessageType); + } + addMessageHeaders(msg); + addMessageProperties(msg); + s.send(msg); + _msgsSent += 1; + valueNumber += 1; + } + } + + } + + proton::message& Sender::setMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) { + if (subType.compare("none") != 0) { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + if (testValueStr.size() != 0) { + throw InvalidTestValueError(subType, testValueStr); + } + msg.content_type(proton::symbol("application/octet-stream")); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_MESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::setBytesMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) { + proton::binary bin; + if (subType.compare("boolean") == 0) { + if (testValueStr.compare("False") == 0) bin.push_back(char(0)); + else if (testValueStr.compare("True") == 0) bin.push_back(char(1)); + else throw InvalidTestValueError(subType, testValueStr); + } else if (subType.compare("byte") == 0) { + uint8_t val = getIntegralValue<int8_t>(testValueStr); + bin.push_back(char(val)); + } else if (subType.compare("bytes") == 0) { + bin.assign(testValueStr.begin(), testValueStr.end()); + } else if (subType.compare("char") == 0) { + bin.push_back(char(0)); + if (testValueStr[0] == '\\') { // Format: '\xNN' + bin.push_back(getIntegralValue<char>(testValueStr.substr(2))); + } else { // Format: 'c' + bin.push_back(testValueStr[0]); + } + } else if (subType.compare("double") == 0) { + uint64_t val; + try { + val = htobe64(std::strtoul(testValueStr.data(), NULL, 16)); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError("double", testValueStr); } + numToBinary(val, bin); + //for (int i=0; i<sizeof(val); ++i) { + // bin.push_back(* ((char*)&val + i)); + // } + } else if (subType.compare("float") == 0) { + uint32_t val; + try { + val = htobe32((uint32_t)std::strtoul(testValueStr.data(), NULL, 16)); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError("float", testValueStr); } + numToBinary(val, bin); + //for (int i=0; i<sizeof(val); ++i) { + // bin.push_back(* ((char*)&val + i)); + //} + } else if (subType.compare("long") == 0) { + uint64_t val = htobe64(getIntegralValue<uint64_t>(testValueStr)); + numToBinary(val, bin); + //bin.assign(sizeof(val), val); + } else if (subType.compare("int") == 0) { + uint32_t val = htobe32(getIntegralValue<uint32_t>(testValueStr)); + numToBinary(val, bin); + //bin.assign(sizeof(val), val); + } else if (subType.compare("short") == 0) { + uint16_t val = htobe16(getIntegralValue<int16_t>(testValueStr)); + numToBinary(val, bin); + //bin.assign(sizeof(val), val); + } else if (subType.compare("string") == 0) { + std::ostringstream oss; + uint16_t strlen = htobe16((uint16_t)testValueStr.size()); + oss.write((char*)&strlen, sizeof(strlen)); + oss << testValueStr; + std::string os = oss.str(); + bin.assign(os.begin(), os.end()); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + msg.body(bin); + msg.inferred(true); + msg.content_type(proton::symbol("application/octet-stream")); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_BYTESMESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::setMapMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr, uint32_t valueNumber) { + std::ostringstream oss; + oss << subType << std::setw(3) << std::setfill('0') << valueNumber; + std::string mapKey(oss.str()); + std::map<std::string, proton::value> m; + if (subType.compare("boolean") == 0) { + if (testValueStr.compare("False") == 0) m[mapKey] = false; + else if (testValueStr.compare("True") == 0) m[mapKey] = true; + else throw InvalidTestValueError(subType, testValueStr); + } else if (subType.compare("byte") == 0) { + m[mapKey] = int8_t(getIntegralValue<int8_t>(testValueStr)); + } else if (subType.compare("bytes") == 0) { + m[mapKey] = proton::binary(testValueStr); + } else if (subType.compare("char") == 0) { + wchar_t val; + if (testValueStr[0] == '\\') { // Format: '\xNN' + val = (wchar_t)getIntegralValue<wchar_t>(testValueStr.substr(2)); + } else { // Format: 'c' + val = testValueStr[0]; + } + m[mapKey] = val; + } else if (subType.compare("double") == 0) { + m[mapKey] = getFloatValue<double, uint64_t>(testValueStr); + } else if (subType.compare("float") == 0) { + m[mapKey] = getFloatValue<float, uint32_t>(testValueStr); + } else if (subType.compare("int") == 0) { + m[mapKey] = getIntegralValue<int32_t>(testValueStr); + } else if (subType.compare("long") == 0) { + m[mapKey] = getIntegralValue<int64_t>(testValueStr); + } else if (subType.compare("short") == 0) { + m[mapKey] = getIntegralValue<int16_t>(testValueStr); + } else if (subType.compare("string") == 0) { + m[mapKey] = testValueStr; + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + msg.inferred(false); + msg.body(m); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_MAPMESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::setObjectMessage(proton::message& msg, const std::string& subType, const Json::Value& testValue) { + msg.body(getJavaObjectBinary(subType, testValue.asString())); + msg.inferred(true); + msg.content_type(proton::symbol("application/x-java-serialized-object")); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_OBJECTMESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::setStreamMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr) { + std::vector<proton::value> l; + if (subType.compare("boolean") == 0) { + if (testValueStr.compare("False") == 0) l.push_back(false); + else if (testValueStr.compare("True") == 0) l.push_back(true); + else throw InvalidTestValueError(subType, testValueStr); + } else if (subType.compare("byte") == 0) { + l.push_back(int8_t(getIntegralValue<int8_t>(testValueStr))); + } else if (subType.compare("bytes") == 0) { + l.push_back(proton::binary(testValueStr)); + } else if (subType.compare("char") == 0) { + wchar_t val; + if (testValueStr[0] == '\\') { // Format: '\xNN' + val = (wchar_t)getIntegralValue<wchar_t>(testValueStr.substr(2)); + } else { // Format: 'c' + val = testValueStr[0]; + } + l.push_back(val); + } else if (subType.compare("double") == 0) { + l.push_back(getFloatValue<double, uint64_t>(testValueStr)); + } else if (subType.compare("float") == 0) { + l.push_back(getFloatValue<float, uint32_t>(testValueStr)); + } else if (subType.compare("int") == 0) { + l.push_back(getIntegralValue<int32_t>(testValueStr)); + } else if (subType.compare("long") == 0) { + l.push_back(getIntegralValue<int64_t>(testValueStr)); + } else if (subType.compare("short") == 0) { + l.push_back(getIntegralValue<int16_t>(testValueStr)); + } else if (subType.compare("string") == 0) { + l.push_back(testValueStr); + } else { + throw qpidit::UnknownJmsMessageSubTypeError(subType); + } + msg.body(l); + msg.inferred(true); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_STREAMMESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::setTextMessage(proton::message& msg, const Json::Value& testValue) { + msg.body(testValue.asString()); + msg.inferred(false); + msg.message_annotations().put(proton::symbol("x-opt-jms-msg-type"), s_jmsMessageTypeAnnotationValues["JMS_TEXTMESSAGE_TYPE"]); + return msg; + } + + proton::message& Sender::addMessageHeaders(proton::message& msg) { + Json::Value::Members headerNames = _testHeadersMap.getMemberNames(); + for (std::vector<std::string>::const_iterator i=headerNames.begin(); i!=headerNames.end(); ++i) { + const Json::Value _subMap = _testHeadersMap[*i]; + const std::string headerValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map + std::string val = _subMap[headerValueType].asString(); + if (i->compare("JMS_TYPE_HEADER") == 0) { + setJmsTypeHeader(msg, val); + } else if (i->compare("JMS_CORRELATIONID_HEADER") == 0) { + if (headerValueType.compare("bytes") == 0) { + setJmsCorrelationId(msg, proton::binary(val)); + } else { + setJmsCorrelationId(msg, val); + } + } else if (i->compare("JMS_REPLYTO_HEADER") == 0) { + setJmsReplyTo(msg, headerValueType, val); + } else { + throw qpidit::UnknownJmsHeaderTypeError(*i); + } + } + return msg; + } + + //static + proton::message& Sender::setJmsTypeHeader(proton::message& msg, const std::string& t) { + msg.subject(t); + return msg; + } + + //static + proton::message& Sender::setJmsCorrelationId(proton::message& msg, const std::string& cid) { + proton::message_id mid(cid); + msg.correlation_id(mid); + msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true); + return msg; + } + + //static + proton::message& Sender::setJmsCorrelationId(proton::message& msg, const proton::binary cid) { + proton::message_id mid(cid); + msg.correlation_id(cid); + msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true); + return msg; + } + + //static + proton::message& Sender::setJmsReplyTo(proton::message& msg, const std::string& dts, const std::string& d) { + if (dts.compare("queue") == 0) { + msg.reply_to(/*std::string("queue://") + */d); + msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(qpidit::JMS_QUEUE)); + } else if (dts.compare("temp_queue") == 0) { + msg.reply_to(/*std::string("queue://") + */d); + msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(qpidit::JMS_TMEP_QUEUE)); + } else if (dts.compare("topic") == 0) { + msg.reply_to(/*std::string("topic://") + */d); + msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(qpidit::JMS_TOPIC)); + } else if (dts.compare("temp_topic") == 0) { + msg.reply_to(/*std::string("topic://") + */d); + msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(qpidit::JMS_TEMP_TOPIC)); + } else { + throw qpidit::UnknownJmsDestinationTypeError(dts); + } + return msg; + } + + proton::message& Sender::addMessageProperties(proton::message& msg) { + Json::Value::Members propertyNames = _testPropertiesMap.getMemberNames(); + for (std::vector<std::string>::const_iterator i=propertyNames.begin(); i!=propertyNames.end(); ++i) { + const Json::Value _subMap = _testPropertiesMap[*i]; + const std::string propertyValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map + std::string val = _subMap[propertyValueType].asString(); + if (propertyValueType.compare("boolean") == 0) { + if (val.compare("False") == 0) setMessageProperty(msg, *i, false); + else if (val.compare("True") == 0) setMessageProperty(msg, *i, true); + else throw InvalidTestValueError(propertyValueType, val); + } else if (propertyValueType.compare("byte") == 0) { + setMessageProperty(msg, *i, getIntegralValue<int8_t>(val)); + } else if (propertyValueType.compare("double") == 0) { + setMessageProperty(msg, *i, getFloatValue<double, uint64_t>(val)); + } else if (propertyValueType.compare("float") == 0) { + setMessageProperty(msg, *i, getFloatValue<float, uint64_t>(val)); + } else if (propertyValueType.compare("int") == 0) { + setMessageProperty(msg, *i, getIntegralValue<int32_t>(val)); + } else if (propertyValueType.compare("long") == 0) { + setMessageProperty(msg, *i, getIntegralValue<int64_t>(val)); + } else if (propertyValueType.compare("short") == 0) { + setMessageProperty(msg, *i, getIntegralValue<int16_t>(val)); + } else if (propertyValueType.compare("string") == 0) { + setMessageProperty(msg, *i, val); + } else { + throw qpidit::UnknownJmsPropertyTypeError(propertyValueType); + } + } + return msg; + } + + //static + proton::binary Sender::getJavaObjectBinary(const std::string& javaClassName, const std::string& valAsString) { + proton::binary javaObjectBinary; + char buf[1024]; + int bytesRead; + FILE* fp = ::popen("java -cp target/JavaObjUtils.jar org.apache.qpid.interop_test.obj_util.JavaObjToBytes javaClassStr", "rb"); + if (fp == NULL) { throw qpidit::PopenError(errno); } + do { + bytesRead = ::fread(buf, 1, sizeof(buf), fp); + javaObjectBinary.insert(javaObjectBinary.end(), &buf[0], &buf[bytesRead-1]); + } while (bytesRead == sizeof(buf)); + int status = ::pclose(fp); + if (status == -1) { + throw qpidit::PcloseError(errno); + } + return javaObjectBinary; + } + + // static + uint32_t Sender::getTotalNumMessages(const Json::Value& testValueMap) { + uint32_t tot = 0; + for (Json::Value::const_iterator i = testValueMap.begin(); i != testValueMap.end(); ++i) { + tot += (*i).size(); + } + return tot; + } + + } /* namespace jms_hdrs_props_test */ +} /* namespace qpidit */ + + + +/* + * --- main --- + * Args: 1: Broker address (ip-addr:port) + * 2: Queue name + * 3: AMQP type + * 4: JSON Test parameters containing 3 maps: [testValueMap, testHeadersMap, testPropertiesMap] + */ + +int main(int argc, char** argv) { +/* + for (int i=0; i<argc; ++i) { + std::cout << "*** argv[" << i << "] : " << argv[i] << std::endl; + } +*/ + // TODO: improve arg management a little... + if (argc != 5) { + throw qpidit::ArgumentError("Incorrect number of arguments"); + } + + std::ostringstream oss; + oss << argv[1] << "/" << argv[2]; + + try { + Json::Value testParams; + Json::Reader jsonReader; + if (not jsonReader.parse(argv[4], testParams, false)) { + throw qpidit::JsonParserError(jsonReader); + } + + qpidit::jms_hdrs_props_test::Sender sender(oss.str(), argv[3], testParams); + proton::default_container(sender).run(); + } catch (const std::exception& e) { + std::cout << "JmsSender error: " << e.what() << std::endl; + } +} http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/514bac75/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Sender.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Sender.hpp b/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Sender.hpp new file mode 100644 index 0000000..8e3efe7 --- /dev/null +++ b/shims/qpid-proton-cpp/src/qpidit/jms_hdrs_props_test/Sender.hpp @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef SRC_QPIDIT_JMS_HEADERS_PROPERTIES_TEST_SENDER_HPP_ +#define SRC_QPIDIT_JMS_HEADERS_PROPERTIES_TEST_SENDER_HPP_ + +#include "json/value.h" +#include "proton/message.hpp" +#include "proton/messaging_handler.hpp" +#include "qpidit/JmsTestBase.hpp" +#include "qpidit/QpidItErrors.hpp" +#include <typeinfo> + +namespace proton { + class message; +} + +namespace qpidit +{ + namespace jms_hdrs_props_test + { + + class Sender : public qpidit::JmsTestBase + { + protected: + const std::string _brokerUrl; + const std::string _jmsMessageType; + const Json::Value _testValueMap; + const Json::Value _testHeadersMap; + const Json::Value _testPropertiesMap; + uint32_t _msgsSent; + uint32_t _msgsConfirmed; + uint32_t _totalMsgs; + public: + Sender(const std::string& brokerUrl, const std::string& jmsMessageType, const Json::Value& testParams); + virtual ~Sender(); + + void on_container_start(proton::container &c); + void on_sendable(proton::sender &s); + void on_tracker_accept(proton::tracker &t); + void on_transport_close(proton::transport &t); + protected: + void sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValueMap); + proton::message& setMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr); + proton::message& setBytesMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr); + proton::message& setMapMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr, uint32_t valueNumber); + proton::message& setObjectMessage(proton::message& msg, const std::string& subType, const Json::Value& testValue); + proton::message& setStreamMessage(proton::message& msg, const std::string& subType, const std::string& testValue); + proton::message& setTextMessage(proton::message& msg, const Json::Value& testValue); + + proton::message& addMessageHeaders(proton::message& msg); + static proton::message& setJmsTypeHeader(proton::message& msg, const std::string& t); + static proton::message& setJmsCorrelationId(proton::message& msg, const std::string& cid); + static proton::message& setJmsCorrelationId(proton::message& msg, const proton::binary cid); + static proton::message& setJmsReplyTo(proton::message& msg, const std::string& dt, const std::string& d); + + proton::message& addMessageProperties(proton::message& msg); + template<typename T> proton::message& setMessageProperty(proton::message& msg, const std::string& propertyName, T val) { + msg.properties().put(propertyName, val); + return msg; + } + + static proton::binary getJavaObjectBinary(const std::string& javaClassName, const std::string& valAsString); + static uint32_t getTotalNumMessages(const Json::Value& testValueMap); + + template<typename T> static T numToBinary(T n, proton::binary& b) { + for (int i=0; i<sizeof(n); ++i) { + b.push_back(* ((char*)&n + i)); + } + } + + // Set message body to floating type T through integral type U + // Used to convert a hex string representation of a float or double to a float or double + template<typename T, typename U> T getFloatValue(const std::string& testValueStr) { + try { + U ival(std::strtoul(testValueStr.data(), NULL, 16)); + return T(*reinterpret_cast<T*>(&ival)); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(typeid(T).name(), testValueStr); } + } + + template<typename T> T getIntegralValue(const std::string& testValueStr) { + try { + return T(std::strtol(testValueStr.data(), NULL, 16)); + } catch (const std::exception& e) { throw qpidit::InvalidTestValueError(typeid(T).name(), testValueStr); } + } + }; + + } /* namespace jms_hdrs_props_test */ +} /* namespace qpidit */ + +#endif /* SRC_QPIDIT_JMS_HEADERS_PROPERTIES_TEST_SENDER_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/514bac75/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/JmsDefinitions.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/JmsDefinitions.hpp b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/JmsDefinitions.hpp deleted file mode 100644 index 6f99813..0000000 --- a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/JmsDefinitions.hpp +++ /dev/null @@ -1,46 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef SRC_QPIDIT_JMS_MESSAGES_TEST_JMSDEFINTIONS_HPP_ -#define SRC_QPIDIT_JMS_MESSAGES_TEST_JMSDEFINTIONS_HPP_ - -namespace qpidit -{ - namespace jms_messages_test - { - typedef enum {JMS_QUEUE = 0, - JMS_TOPIC, - JMS_TMEP_QUEUE, - JMS_TEMP_TOPIC} - jmsDestinationType_t; - - typedef enum {JMS_MESSAGE_TYPE=0, - JMS_OBJECTMESSAGE_TYPE, - JMS_MAPMESSAGE_TYPE, - JMS_BYTESMESSAGE_TYPE, - JMS_STREAMMESSAGE_TYPE, - JMS_TEXTMESSAGE_TYPE} - jmsMessageType_t; - - } -} - -#endif /* SRC_QPIDIT_JMS_MESSAGES_TEST_JMSDEFINTIONS_HPP_ */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/514bac75/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.cpp b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.cpp index ed1dfa5..43bd34f 100644 --- a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.cpp +++ b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.cpp @@ -33,27 +33,18 @@ namespace qpidit { namespace jms_messages_test { - //static - proton::symbol Receiver::s_jmsMessageTypeAnnotationKey("x-opt-jms-msg-type"); - std::map<std::string, int8_t>Receiver::s_jmsMessageTypeAnnotationValues = initializeJmsMessageTypeAnnotationMap(); - - Receiver::Receiver(const std::string& brokerUrl, const std::string& jmsMessageType, - const Json::Value& testNumberMap, - const Json::Value& flagMap): + const Json::Value& testNumberMap): _brokerUrl(brokerUrl), _jmsMessageType(jmsMessageType), _testNumberMap(testNumberMap), - _flagMap(flagMap), _subTypeList(testNumberMap.getMemberNames()), _subTypeIndex(0), _expected(getTotalNumExpectedMsgs(testNumberMap)), _received(0UL), _receivedSubTypeList(Json::arrayValue), - _receivedValueMap(Json::objectValue), - _receivedHeadersMap(Json::objectValue), - _receivedPropertiesMap(Json::objectValue) + _receivedValueMap(Json::objectValue) {} Receiver::~Receiver() {} @@ -62,14 +53,6 @@ namespace qpidit return _receivedValueMap; } - Json::Value& Receiver::getReceivedHeadersMap() { - return _receivedHeadersMap; - } - - Json::Value& Receiver::getReceivedPropertiesMap() { - return _receivedPropertiesMap; - } - void Receiver::on_container_start(proton::container &c) { c.open_receiver(_brokerUrl); } @@ -77,38 +60,35 @@ namespace qpidit void Receiver::on_message(proton::delivery &d, proton::message &m) { try { if (_received < _expected) { - int8_t t = JMS_MESSAGE_TYPE; + int8_t t = qpidit::JMS_MESSAGE_TYPE; try {t = m.message_annotations().get(proton::symbol("x-opt-jms-msg-type")).get<int8_t>();} catch (const std::exception& e) { std::cout << "JmsReceiver::on_message(): Missing annotation \"x-opt-jms-msg-type\"" << std::endl; throw; } switch (t) { - case JMS_MESSAGE_TYPE: + case qpidit::JMS_MESSAGE_TYPE: receiveJmsMessage(m); break; - case JMS_OBJECTMESSAGE_TYPE: + case qpidit::JMS_OBJECTMESSAGE_TYPE: receiveJmsObjectMessage(m); break; - case JMS_MAPMESSAGE_TYPE: + case qpidit::JMS_MAPMESSAGE_TYPE: receiveJmsMapMessage(m); break; - case JMS_BYTESMESSAGE_TYPE: + case qpidit::JMS_BYTESMESSAGE_TYPE: receiveJmsBytesMessage(m); break; - case JMS_STREAMMESSAGE_TYPE: + case qpidit::JMS_STREAMMESSAGE_TYPE: receiveJmsStreamMessage(m); break; - case JMS_TEXTMESSAGE_TYPE: + case qpidit::JMS_TEXTMESSAGE_TYPE: receiveJmsTextMessage(m); break; default:; // TODO: handle error - no known JMS message type } - processMessageHeaders(m); - processMessageProperties(m); - std::string subType(_subTypeList[_subTypeIndex]); // Increment the subtype if the required number of messages have been received if (_receivedSubTypeList.size() >= _testNumberMap[subType].asInt() && @@ -130,26 +110,6 @@ namespace qpidit } } - void Receiver::on_connection_error(proton::connection &c) { - std::cerr << "JmsReceiver::on_connection_error(): " << c.error() << std::endl; - } - - void Receiver::on_receiver_error(proton::receiver& r) { - std::cerr << "JmsReceiver::on_receiver_error(): " << r.error() << std::endl; - } - - void Receiver::on_session_error(proton::session &s) { - std::cerr << "JmsReceiver::on_session_error(): " << s.error() << std::endl; - } - - void Receiver::on_transport_error(proton::transport &t) { - std::cerr << "JmsReceiver::on_transport_error(): " << t.error() << std::endl; - } - - void Receiver::on_error(const proton::error_condition &ec) { - std::cerr << "JmsReceiver::on_error(): " << ec << std::endl; - } - //static uint32_t Receiver::getTotalNumExpectedMsgs(const Json::Value testNumberMap) { uint32_t total(0UL); @@ -309,85 +269,6 @@ namespace qpidit _receivedSubTypeList.append(Json::Value(msg.body().get<std::string>())); } - void Receiver::processMessageHeaders(const proton::message& msg) { - addMessageHeaderString("JMS_TYPE_HEADER", msg.subject()); - if (_flagMap.isMember("JMS_CORRELATIONID_AS_BYTES") && _flagMap["JMS_CORRELATIONID_AS_BYTES"].asBool()) { - addMessageHeaderByteArray("JMS_CORRELATIONID_HEADER", proton::get<proton::binary>(msg.correlation_id())); - } else { - try { - addMessageHeaderString("JMS_CORRELATIONID_HEADER", proton::get<std::string>(msg.correlation_id())); - } catch (const std::exception& e) {} // TODO: UGLY, how do you check if there _is_ a correlation id? - } - - std::string reply_to = msg.reply_to(); - // Some brokers prepend 'queue://' and 'topic://' to reply_to addresses, strip these when present - if (_flagMap.isMember("JMS_REPLYTO_AS_TOPIC") && _flagMap["JMS_REPLYTO_AS_TOPIC"].asBool()) { - if (reply_to.find("topic://") == 0) { - addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_TOPIC, reply_to.substr(8)); - } else { - addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_TOPIC, reply_to); - } - } else { - if (reply_to.find("queue://") == 0) { - addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_QUEUE, reply_to.substr(8)); - } else { - addMessageHeaderDestination("JMS_REPLYTO_HEADER", JMS_QUEUE, reply_to); - } - } - } - - void Receiver::addMessageHeaderString(const char* headerName, const std::string& value) { - if (!value.empty()) { // TODO: Remove this test when PROTON-1288 is fixed as empty strings are allowed in headers - Json::Value valueMap(Json::objectValue); - valueMap["string"] = value; - _receivedHeadersMap[headerName] = valueMap; - } - } - - void Receiver::addMessageHeaderByteArray(const std::string& headerName, const proton::binary ba) { - if (!ba.empty()) { // TODO: Remove this test when PROTON-1288 is fixed as empty binaries are allowed in headers - Json::Value valueMap(Json::objectValue); - valueMap["bytes"] = std::string(ba); - _receivedHeadersMap[headerName] = valueMap; - } - } - - void Receiver::addMessageHeaderDestination(const std::string& headerName, jmsDestinationType_t dt, const std::string& d) { - if (!d.empty()) { - Json::Value valueMap(Json::objectValue); - switch (dt) { - case JMS_QUEUE: - valueMap["queue"] = d; - break; - case JMS_TOPIC: - valueMap["topic"] = d; - break; - default: - ; // TODO: Handle error: remaining JMS destinations not handled. - } - _receivedHeadersMap[headerName] = valueMap; - } - } - - void Receiver::processMessageProperties(const proton::message& msg) { - // TODO: Add this function when PROTON-1284 is fixed -// std::map<proton::value, proton::value> props; -// msg.properties().value() >> props; - } - - //static - std::map<std::string, int8_t> Receiver::initializeJmsMessageTypeAnnotationMap() { - std::map<std::string, int8_t> m; - m["JMS_MESSAGE_TYPE"] = JMS_MESSAGE_TYPE; - m["JMS_OBJECTMESSAGE_TYPE"] = JMS_OBJECTMESSAGE_TYPE; - m["JMS_MAPMESSAGE_TYPE"] = JMS_MAPMESSAGE_TYPE; - m["JMS_BYTESMESSAGE_TYPE"] = JMS_BYTESMESSAGE_TYPE; - m["JMS_STREAMMESSAGE_TYPE"] = JMS_STREAMMESSAGE_TYPE; - m["JMS_TEXTMESSAGE_TYPE"] = JMS_TEXTMESSAGE_TYPE; - return m; - } - - } /* namespace jms_messages_test */ } /* namespace qpidit */ @@ -418,14 +299,12 @@ int main(int argc, char** argv) { throw qpidit::JsonParserError(jsonReader); } - qpidit::jms_messages_test::Receiver receiver(oss.str(), argv[3], testParams[0], testParams[1]); + qpidit::jms_messages_test::Receiver receiver(oss.str(), argv[3], testParams); proton::default_container(receiver).run(); Json::FastWriter fw; std::cout << argv[3] << std::endl; std::cout << fw.write(receiver.getReceivedValueMap()); - std::cout << fw.write(receiver.getReceivedHeadersMap()); - std::cout << fw.write(receiver.getReceivedPropertiesMap()); } catch (const std::exception& e) { std::cout << "JmsReceiver error: " << e.what() << std::endl; } http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/514bac75/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.hpp b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.hpp index 696b9dd..d7c183b 100644 --- a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.hpp +++ b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Receiver.hpp @@ -26,7 +26,7 @@ #include <json/value.h> #include "proton/messaging_handler.hpp" #include "proton/types.hpp" -#include "qpidit/jms_messages_test/JmsDefinitions.hpp" +#include "qpidit/JmsTestBase.hpp" #include <sstream> namespace qpidit @@ -34,42 +34,28 @@ namespace qpidit namespace jms_messages_test { - class Receiver : public proton::messaging_handler + class Receiver : public qpidit::JmsTestBase { protected: - static proton::symbol s_jmsMessageTypeAnnotationKey; - static std::map<std::string, int8_t>s_jmsMessageTypeAnnotationValues; - const std::string _brokerUrl; const std::string _jmsMessageType; const Json::Value _testNumberMap; - const Json::Value _flagMap; Json::Value::Members _subTypeList; int _subTypeIndex; uint32_t _expected; uint32_t _received; Json::Value _receivedSubTypeList; Json::Value _receivedValueMap; - Json::Value _receivedHeadersMap; - Json::Value _receivedPropertiesMap; + public: Receiver(const std::string& brokerUrl, const std::string& jmsMessageType, - const Json::Value& testNumberMap, - const Json::Value& flagMap); + const Json::Value& testNumberMap); virtual ~Receiver(); Json::Value& getReceivedValueMap(); - Json::Value& getReceivedHeadersMap(); - Json::Value& getReceivedPropertiesMap(); void on_container_start(proton::container &c); void on_message(proton::delivery &d, proton::message &m); - void on_connection_error(proton::connection &c); - void on_receiver_error(proton::receiver& r); - void on_session_error(proton::session &s); - void on_transport_error(proton::transport &t); - void on_error(const proton::error_condition &c); - static uint32_t getTotalNumExpectedMsgs(const Json::Value testNumberMap); protected: @@ -80,14 +66,6 @@ namespace qpidit void receiveJmsStreamMessage(const proton::message& msg); void receiveJmsTextMessage(const proton::message& msg); - void processMessageHeaders(const proton::message& msg); - void addMessageHeaderString(const char* headerName, const std::string& value); - void addMessageHeaderByteArray(const std::string& headerName, const proton::binary ba); - void addMessageHeaderDestination(const std::string& headerName, jmsDestinationType_t dt, const std::string& d); - void processMessageProperties(const proton::message& msg); - - static std::map<std::string, int8_t> initializeJmsMessageTypeAnnotationMap(); - // Format signed numbers in negative hex format if signedFlag is true, ie -0xNNNN, positive numbers in 0xNNNN format template<typename T> static std::string toHexStr(T val, bool fillFlag = false, bool signedFlag = true) { std::ostringstream oss; http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/514bac75/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.cpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.cpp b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.cpp index dbe2c0d..42ba558 100644 --- a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.cpp +++ b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.cpp @@ -35,18 +35,12 @@ namespace qpidit { namespace jms_messages_test { - //static - proton::symbol Sender::s_jmsMessageTypeAnnotationKey("x-opt-jms-msg-type"); - std::map<std::string, int8_t>Sender::s_jmsMessageTypeAnnotationValues = initializeJmsMessageTypeAnnotationMap(); - Sender::Sender(const std::string& brokerUrl, const std::string& jmsMessageType, const Json::Value& testParams) : _brokerUrl(brokerUrl), _jmsMessageType(jmsMessageType), - _testValueMap(testParams[0]), - _testHeadersMap(testParams[1]), - _testPropertiesMap(testParams[2]), + _testValueMap(testParams), _msgsSent(0), _msgsConfirmed(0), _totalMsgs(getTotalNumMessages(_testValueMap)) @@ -85,26 +79,6 @@ namespace qpidit _msgsSent = _msgsConfirmed; } - void Sender::on_connection_error(proton::connection &c) { - std::cerr << "JmsSender::on_connection_error(): " << c.error() << std::endl; - } - - void Sender::on_sender_error(proton::sender &s) { - std::cerr << "JmsSender::on_sender_error(): " << s.error() << std::endl; - } - - void Sender::on_session_error(proton::session &s) { - std::cerr << "JmsSender::on_session_error(): " << s.error() << std::endl; - } - - void Sender::on_transport_error(proton::transport &t) { - std::cerr << "JmsSender::on_transport_error(): " << t.error() << std::endl; - } - - void Sender::on_error(const proton::error_condition &ec) { - std::cerr << "JmsSender::on_error(): " << ec << std::endl; - } - // protected void Sender::sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValues) { @@ -127,8 +101,6 @@ namespace qpidit } else { throw qpidit::UnknownJmsMessageTypeError(_jmsMessageType); } - addMessageHeaders(msg); - addMessageProperties(msg); s.send(msg); _msgsSent += 1; valueNumber += 1; @@ -310,102 +282,6 @@ namespace qpidit return msg; } - proton::message& Sender::addMessageHeaders(proton::message& msg) { - Json::Value::Members headerNames = _testHeadersMap.getMemberNames(); - for (std::vector<std::string>::const_iterator i=headerNames.begin(); i!=headerNames.end(); ++i) { - const Json::Value _subMap = _testHeadersMap[*i]; - const std::string headerValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map - std::string val = _subMap[headerValueType].asString(); - if (i->compare("JMS_TYPE_HEADER") == 0) { - setJmsTypeHeader(msg, val); - } else if (i->compare("JMS_CORRELATIONID_HEADER") == 0) { - if (headerValueType.compare("bytes") == 0) { - setJmsCorrelationId(msg, proton::binary(val)); - } else { - setJmsCorrelationId(msg, val); - } - } else if (i->compare("JMS_REPLYTO_HEADER") == 0) { - setJmsReplyTo(msg, headerValueType, val); - } else { - throw qpidit::UnknownJmsHeaderTypeError(*i); - } - } - return msg; - } - - //static - proton::message& Sender::setJmsTypeHeader(proton::message& msg, const std::string& t) { - msg.subject(t); - return msg; - } - - //static - proton::message& Sender::setJmsCorrelationId(proton::message& msg, const std::string& cid) { - proton::message_id mid(cid); - msg.correlation_id(mid); - msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true); - return msg; - } - - //static - proton::message& Sender::setJmsCorrelationId(proton::message& msg, const proton::binary cid) { - proton::message_id mid(cid); - msg.correlation_id(cid); - msg.message_annotations().put(proton::symbol("x-opt-app-correlation-id"), true); - return msg; - } - - //static - proton::message& Sender::setJmsReplyTo(proton::message& msg, const std::string& dts, const std::string& d) { - if (dts.compare("queue") == 0) { - msg.reply_to(/*std::string("queue://") + */d); - msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_QUEUE)); - } else if (dts.compare("temp_queue") == 0) { - msg.reply_to(/*std::string("queue://") + */d); - msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TMEP_QUEUE)); - } else if (dts.compare("topic") == 0) { - msg.reply_to(/*std::string("topic://") + */d); - msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TOPIC)); - } else if (dts.compare("temp_topic") == 0) { - msg.reply_to(/*std::string("topic://") + */d); - msg.message_annotations().put(proton::symbol("x-opt-jms-reply-to"), int8_t(JMS_TEMP_TOPIC)); - } else { - throw qpidit::UnknownJmsDestinationTypeError(dts); - } - return msg; - } - - proton::message& Sender::addMessageProperties(proton::message& msg) { - Json::Value::Members propertyNames = _testPropertiesMap.getMemberNames(); - for (std::vector<std::string>::const_iterator i=propertyNames.begin(); i!=propertyNames.end(); ++i) { - const Json::Value _subMap = _testPropertiesMap[*i]; - const std::string propertyValueType = _subMap.getMemberNames()[0]; // There is always only one entry in map - std::string val = _subMap[propertyValueType].asString(); - if (propertyValueType.compare("boolean") == 0) { - if (val.compare("False") == 0) setMessageProperty(msg, *i, false); - else if (val.compare("True") == 0) setMessageProperty(msg, *i, true); - else throw InvalidTestValueError(propertyValueType, val); - } else if (propertyValueType.compare("byte") == 0) { - setMessageProperty(msg, *i, getIntegralValue<int8_t>(val)); - } else if (propertyValueType.compare("double") == 0) { - setMessageProperty(msg, *i, getFloatValue<double, uint64_t>(val)); - } else if (propertyValueType.compare("float") == 0) { - setMessageProperty(msg, *i, getFloatValue<float, uint64_t>(val)); - } else if (propertyValueType.compare("int") == 0) { - setMessageProperty(msg, *i, getIntegralValue<int32_t>(val)); - } else if (propertyValueType.compare("long") == 0) { - setMessageProperty(msg, *i, getIntegralValue<int64_t>(val)); - } else if (propertyValueType.compare("short") == 0) { - setMessageProperty(msg, *i, getIntegralValue<int16_t>(val)); - } else if (propertyValueType.compare("string") == 0) { - setMessageProperty(msg, *i, val); - } else { - throw qpidit::UnknownJmsPropertyTypeError(propertyValueType); - } - } - return msg; - } - //static proton::binary Sender::getJavaObjectBinary(const std::string& javaClassName, const std::string& valAsString) { proton::binary javaObjectBinary; @@ -433,18 +309,6 @@ namespace qpidit return tot; } - //static - std::map<std::string, int8_t> Sender::initializeJmsMessageTypeAnnotationMap() { - std::map<std::string, int8_t> m; - m["JMS_MESSAGE_TYPE"] = JMS_MESSAGE_TYPE; - m["JMS_OBJECTMESSAGE_TYPE"] = JMS_OBJECTMESSAGE_TYPE; - m["JMS_MAPMESSAGE_TYPE"] = JMS_MAPMESSAGE_TYPE; - m["JMS_BYTESMESSAGE_TYPE"] = JMS_BYTESMESSAGE_TYPE; - m["JMS_STREAMMESSAGE_TYPE"] = JMS_STREAMMESSAGE_TYPE; - m["JMS_TEXTMESSAGE_TYPE"] = JMS_TEXTMESSAGE_TYPE; - return m; - } - } /* namespace jms_messages_test */ } /* namespace qpidit */ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/514bac75/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.hpp ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.hpp b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.hpp index 5e41120..551edbf 100644 --- a/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.hpp +++ b/shims/qpid-proton-cpp/src/qpidit/jms_messages_test/Sender.hpp @@ -25,8 +25,8 @@ #include "json/value.h" #include "proton/message.hpp" #include "proton/messaging_handler.hpp" +#include "qpidit/JmsTestBase.hpp" #include "qpidit/QpidItErrors.hpp" -#include "qpidit/jms_messages_test/JmsDefinitions.hpp" #include <typeinfo> namespace proton { @@ -38,17 +38,12 @@ namespace qpidit namespace jms_messages_test { - class Sender : public proton::messaging_handler + class Sender : public qpidit::JmsTestBase { protected: - static proton::symbol s_jmsMessageTypeAnnotationKey; - static std::map<std::string, int8_t>s_jmsMessageTypeAnnotationValues; - const std::string _brokerUrl; const std::string _jmsMessageType; const Json::Value _testValueMap; - const Json::Value _testHeadersMap; - const Json::Value _testPropertiesMap; uint32_t _msgsSent; uint32_t _msgsConfirmed; uint32_t _totalMsgs; @@ -60,12 +55,6 @@ namespace qpidit void on_sendable(proton::sender &s); void on_tracker_accept(proton::tracker &t); void on_transport_close(proton::transport &t); - - void on_connection_error(proton::connection &c); - void on_session_error(proton::session &s); - void on_sender_error(proton::sender& s); - void on_transport_error(proton::transport &t); - void on_error(const proton::error_condition &c); protected: void sendMessages(proton::sender &s, const std::string& subType, const Json::Value& testValueMap); proton::message& setMessage(proton::message& msg, const std::string& subType, const std::string& testValueStr); @@ -75,23 +64,9 @@ namespace qpidit proton::message& setStreamMessage(proton::message& msg, const std::string& subType, const std::string& testValue); proton::message& setTextMessage(proton::message& msg, const Json::Value& testValue); - proton::message& addMessageHeaders(proton::message& msg); - static proton::message& setJmsTypeHeader(proton::message& msg, const std::string& t); - static proton::message& setJmsCorrelationId(proton::message& msg, const std::string& cid); - static proton::message& setJmsCorrelationId(proton::message& msg, const proton::binary cid); - static proton::message& setJmsReplyTo(proton::message& msg, const std::string& dt, const std::string& d); - - proton::message& addMessageProperties(proton::message& msg); - template<typename T> proton::message& setMessageProperty(proton::message& msg, const std::string& propertyName, T val) { - msg.properties().put(propertyName, val); - return msg; - } - static proton::binary getJavaObjectBinary(const std::string& javaClassName, const std::string& valAsString); static uint32_t getTotalNumMessages(const Json::Value& testValueMap); - static std::map<std::string, int8_t> initializeJmsMessageTypeAnnotationMap(); - template<typename T> static T numToBinary(T n, proton::binary& b) { for (int i=0; i<sizeof(n); ++i) { b.push_back(* ((char*)&n + i)); http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/514bac75/shims/qpid-proton-python/src/amqp_types_test/Receiver.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/amqp_types_test/Receiver.py b/shims/qpid-proton-python/src/amqp_types_test/Receiver.py index 2876f51..7bb6355 100755 --- a/shims/qpid-proton-python/src/amqp_types_test/Receiver.py +++ b/shims/qpid-proton-python/src/amqp_types_test/Receiver.py @@ -26,13 +26,14 @@ AMQP type test receiver shim for qpid-interop-test # Issues: # * Capturing errors from client or broker -import sys from json import dumps -from proton.handlers import MessagingHandler -from proton.reactor import Container -from traceback import format_exc from string import digits, letters, punctuation from struct import pack, unpack +import sys +from traceback import format_exc + +from proton.handlers import MessagingHandler +from proton.reactor import Container class AmqpTypesReceiverShim(MessagingHandler): """ http://git-wip-us.apache.org/repos/asf/qpid-interop-test/blob/514bac75/shims/qpid-proton-python/src/amqp_types_test/Sender.py ---------------------------------------------------------------------- diff --git a/shims/qpid-proton-python/src/amqp_types_test/Sender.py b/shims/qpid-proton-python/src/amqp_types_test/Sender.py index 19d183f..b760467 100755 --- a/shims/qpid-proton-python/src/amqp_types_test/Sender.py +++ b/shims/qpid-proton-python/src/amqp_types_test/Sender.py @@ -26,16 +26,17 @@ AMQP type test sender shim for qpid-interop-test # Issues: # * Capturing errors from client or broker -import sys -import os.path from json import loads +import os.path +from struct import unpack +import sys +from traceback import format_exc +from uuid import UUID + from proton import byte, char, decimal32, decimal64, decimal128, float32, int32, Message, short, symbol, timestamp, \ ubyte, uint, ulong, ushort from proton.handlers import MessagingHandler from proton.reactor import Container -from struct import unpack -from traceback import format_exc -from uuid import UUID class AmqpTypesSenderShim(MessagingHandler): """ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
