Repository: qpid-broker-j Updated Branches: refs/heads/master d12b40a4d -> dbaff6090
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/utils/LoggingOutputStream.java ---------------------------------------------------------------------- diff --git a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/utils/LoggingOutputStream.java b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/utils/LoggingOutputStream.java new file mode 100644 index 0000000..d6256cf --- /dev/null +++ b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/utils/LoggingOutputStream.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.systests.end_to_end_conversion.utils; + +import java.io.IOException; +import java.io.OutputStream; + +import org.slf4j.Logger; +import org.slf4j.event.Level; + +/* + * Original code by Jim Moore + * See: https://www.mail-archive.com/user@slf4j.org/msg00673.html + * Adapted for Qpid needs. + */ + +/** + * An OutputStream that flushes out to a Category.<p> + * <p/> + * Note that no data is written out to the Category until the stream is + * flushed or closed.<p> + * <p/> + * Example:<pre> + * // make sure everything sent to System.err is logged + * System.setErr(new PrintStream(new + * LoggingOutputStream(Logger.getRootCategory(), + * Level.WARN), true)); + * <p/> + * // make sure everything sent to System.out is also logged + * System.setOut(new PrintStream(new + * LoggingOutputStream(Logger.getRootCategory(), + * Level.INFO), true)); + * </pre> + * + * @author <a href="[EMAIL PROTECTED]">Jim Moore</a> + */ + +// +public class LoggingOutputStream extends OutputStream +{ + /** + * Platform dependant line separator + */ + private static final byte[] LINE_SEPARATOR_BYTES = System.getProperty("line.separator").getBytes(); + /** + * The default number of bytes in the buffer. =2048 + */ + private static final int DEFAULT_BUFFER_LENGTH = 2048; + /** + * Used to maintain the contract of [EMAIL PROTECTED] #close()}. + */ + private boolean hasBeenClosed = false; + /** + * The internal buffer where data is stored. + */ + private byte[] buf; + /** + * The number of valid bytes in the buffer. This value is always + * in the range <tt>0</tt> through <tt>buf.length</tt>; elements + * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid + * byte data. + */ + private int count; + /** + * Remembers the size of the buffer for speed. + */ + private int bufLength; + /** + * The category to write to. + */ + private Logger logger; + + /** + * The priority to use when writing to the Category. + */ + private Level level; + + /** + * Creates the LoggingOutputStream to flush to the given Category. + * + * @param log the Logger to write to + * @param level the Level to use when writing to the Logger + * @throws IllegalArgumentException if cat == null or priority == + * null + */ + public LoggingOutputStream(Logger log, Level level) throws IllegalArgumentException + { + if (log == null) + { + throw new IllegalArgumentException("cat == null"); + } + if (level == null) + { + throw new IllegalArgumentException("priority == null"); + } + + this.level = level; + + logger = log; + bufLength = DEFAULT_BUFFER_LENGTH; + buf = new byte[DEFAULT_BUFFER_LENGTH]; + count = 0; + } + + + /** + * Closes this output stream and releases any system resources + * associated with this stream. The general contract of + * <code>close</code> + * is that it closes the output stream. A closed stream cannot + * perform + * output operations and cannot be reopened. + */ + public void close() + { + flush(); + hasBeenClosed = true; + } + + + /** + * Writes the specified byte to this output stream. The general + * contract for <code>write</code> is that one byte is written + * to the output stream. The byte to be written is the eight + * low-order bits of the argument <code>b</code>. The 24 + * high-order bits of <code>b</code> are ignored. + * + * @param b the <code>byte</code> to write + * @throws java.io.IOException if an I/O error occurs. In particular, an + * <code>IOException</code> may be + * thrown if the output stream has been closed. + */ + public void write(final int b) throws IOException + { + if (hasBeenClosed) + { + throw new IOException("The stream has been closed."); + } + + // would this be writing past the buffer? + + if (count == bufLength) + { + // grow the buffer + final int newBufLength = bufLength + DEFAULT_BUFFER_LENGTH; + final byte[] newBuf = new byte[newBufLength]; + + System.arraycopy(buf, 0, newBuf, 0, bufLength); + buf = newBuf; + + bufLength = newBufLength; + } + + buf[count] = (byte) b; + + count++; + + if (endsWithNewLine()) + { + flush(); + } + } + + private boolean endsWithNewLine() + { + if (count >= LINE_SEPARATOR_BYTES.length) + { + for (int i = 0; i < LINE_SEPARATOR_BYTES.length; i++) + { + if (buf[count - LINE_SEPARATOR_BYTES.length + i] != LINE_SEPARATOR_BYTES[i]) + { + return false; + } + } + return true; + } + else + { + return false; + } + } + + + /** + * Flushes this output stream and forces any buffered output bytes + * to be written out. The general contract of <code>flush</code> is + * that calling it is an indication that, if any bytes previously + * written have been buffered by the implementation of the output + * stream, such bytes should immediately be written to their + * intended destination. + */ + public void flush() + { + + if (count == 0) + { + return; + } + + // don't print out blank lines; flushing from PrintStream puts + + // For linux system + + if (count == 1 && ((char) buf[0]) == '\n') + { + reset(); + return; + } + + // For mac system + + if (count == 1 && ((char) buf[0]) == '\r') + { + reset(); + return; + } + + // On windows system + + if (count == 2 && (char) buf[0] == '\r' && (char) buf[1] == '\n') + { + reset(); + return; + } + + while (endsWithNewLine()) + { + count -= LINE_SEPARATOR_BYTES.length; + } + final byte[] theBytes = new byte[count]; + System.arraycopy(buf, 0, theBytes, 0, count); + final String message = new String(theBytes); + switch (level) + { + case ERROR: + logger.error(message); + break; + case WARN: + logger.warn(message); + break; + case INFO: + logger.info(message); + break; + case DEBUG: + logger.debug(message); + break; + case TRACE: + logger.trace(message); + break; + } + reset(); + } + + private void reset() + { + // not resetting the buffer -- assuming that if it grew then it will likely grow similarly again + count = 0; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java ---------------------------------------------------------------------- diff --git a/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java b/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java index 79812f0..a4f4f76 100644 --- a/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java +++ b/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java @@ -20,29 +20,57 @@ package org.apache.qpid.systests.end_to_end_conversion; -import static org.junit.Assert.fail; +import static org.junit.Assume.assumeTrue; +import java.util.EnumSet; import java.util.HashMap; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import org.junit.Ignore; +import org.junit.Before; import org.junit.Test; +import org.apache.qpid.server.model.Protocol; +import org.apache.qpid.systests.end_to_end_conversion.client.ClientInstruction; +import org.apache.qpid.systests.end_to_end_conversion.client.MessageDescription; import org.apache.qpid.systests.end_to_end_conversion.client.VerificationException; public class SimpleConversionTest extends EndToEndConversionTestBase { private static final long TEST_TIMEOUT = 30000L; + public static final String QUEUE_NAME = "testQueue"; + public static final String REPLY_QUEUE_NAME = "testReplyQueue"; + private static final String QUEUE_JNDI_NAME = "queue"; + private static final String REPLY_QUEUE_JNDI_NAME = "replyQueue"; + + + private HashMap<String, String> _defaultDestinations; + + @Before + public void setup() + { + getBrokerAdmin().createQueue(QUEUE_NAME); + getBrokerAdmin().createQueue(REPLY_QUEUE_NAME); + + _defaultDestinations = new HashMap<>(); + _defaultDestinations.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME); + _defaultDestinations.put("queue." + REPLY_QUEUE_JNDI_NAME, REPLY_QUEUE_NAME); +/* + destinations.put("topic.topic", "testTopic"); + destinations.put("topic.replyTopic", "testReplyTopic"); + destinations.put("destination.destination", "testDestination"); + destinations.put("destination.replyDestination", "testReplyDestination"); +*/ + } @Test public void textMessage() throws Exception { - final JmsInstructions.MessageDescription messageDescription = new JmsInstructions.MessageDescription(); - messageDescription.setMessageType(JmsInstructions.MessageDescription.MessageType.TEXT_MESSAGE); + final MessageDescription messageDescription = new MessageDescription(); + messageDescription.setMessageType(MessageDescription.MessageType.TEXT_MESSAGE); messageDescription.setContent("foobar"); performSimpleTest(messageDescription); @@ -51,8 +79,8 @@ public class SimpleConversionTest extends EndToEndConversionTestBase @Test public void bytesMessage() throws Exception { - final JmsInstructions.MessageDescription messageDescription = new JmsInstructions.MessageDescription(); - messageDescription.setMessageType(JmsInstructions.MessageDescription.MessageType.BYTES_MESSAGE); + final MessageDescription messageDescription = new MessageDescription(); + messageDescription.setMessageType(MessageDescription.MessageType.BYTES_MESSAGE); messageDescription.setContent(new byte[]{0x00, (byte) 0xFF, (byte) 0xc3}); performSimpleTest(messageDescription); @@ -61,8 +89,8 @@ public class SimpleConversionTest extends EndToEndConversionTestBase @Test public void mapMessage() throws Exception { - final JmsInstructions.MessageDescription messageDescription = new JmsInstructions.MessageDescription(); - messageDescription.setMessageType(JmsInstructions.MessageDescription.MessageType.MAP_MESSAGE); + final MessageDescription messageDescription = new MessageDescription(); + messageDescription.setMessageType(MessageDescription.MessageType.MAP_MESSAGE); HashMap<String, Object> content = new HashMap<>(); content.put("int", 42); content.put("boolean", true); @@ -73,22 +101,36 @@ public class SimpleConversionTest extends EndToEndConversionTestBase } @Test + public void type() throws Exception + { + final String type = "testType"; + final MessageDescription messageDescription = new MessageDescription(); + messageDescription.setHeader(MessageDescription.MessageHeader.TYPE, type); + + performSimpleTest(messageDescription); + } + + @Test public void correlationId() throws Exception { final String correlationId = "myCorrelationId"; - final JmsInstructions.MessageDescription messageDescription = new JmsInstructions.MessageDescription(); - messageDescription.setHeader(JmsInstructions.MessageDescription.MessageHeader.CORRELATION_ID, correlationId); + final MessageDescription messageDescription = new MessageDescription(); + messageDescription.setHeader(MessageDescription.MessageHeader.CORRELATION_ID, correlationId); performSimpleTest(messageDescription); } - @Ignore("QPID-7897") @Test public void correlationIdAsBytes() throws Exception { + assumeTrue("This test is known to fail for pre 0-10 subscribers (QPID-7897)", + EnumSet.of(Protocol.AMQP_0_10, Protocol.AMQP_1_0).contains(getSubscriberProtocolVersion())); + assumeTrue("This test is known to fail for pre 0-10 subscribers (QPID-7899)", + EnumSet.of(Protocol.AMQP_0_10, Protocol.AMQP_1_0).contains(getPublisherProtocolVersion())); + final byte[] correlationId = new byte[]{(byte) 0xFF, 0x00, (byte) 0xC3}; - final JmsInstructions.MessageDescription messageDescription = new JmsInstructions.MessageDescription(); - messageDescription.setHeader(JmsInstructions.MessageDescription.MessageHeader.CORRELATION_ID, correlationId); + final MessageDescription messageDescription = new MessageDescription(); + messageDescription.setHeader(MessageDescription.MessageHeader.CORRELATION_ID, correlationId); performSimpleTest(messageDescription); } @@ -96,7 +138,7 @@ public class SimpleConversionTest extends EndToEndConversionTestBase @Test public void property() throws Exception { - final JmsInstructions.MessageDescription messageDescription = new JmsInstructions.MessageDescription(); + final MessageDescription messageDescription = new MessageDescription(); messageDescription.setProperty("intProperty", 42); messageDescription.setProperty("stringProperty", "foobar"); messageDescription.setProperty("booleanProperty", true); @@ -105,12 +147,61 @@ public class SimpleConversionTest extends EndToEndConversionTestBase performSimpleTest(messageDescription); } - public void performSimpleTest(final JmsInstructions.MessageDescription messageDescription) throws Exception + @Test + public void replyTo() throws Exception + { + performReplyToTest(REPLY_QUEUE_JNDI_NAME); + } + + @Test + public void replyToTemporaryQueue() throws Exception + { + performReplyToTest(TEMPORARY_QUEUE_JNDI_NAME); + } + + public void performReplyToTest(final String temporaryQueueJndiName) throws Exception + { + assumeTrue("This test is known to fail for pre 0-10 subscribers (QPID-7898)", + EnumSet.of(Protocol.AMQP_0_10, Protocol.AMQP_1_0).contains(getSubscriberProtocolVersion())); + + final String correlationId = "testCorrelationId"; + final String destinationJndiName = QUEUE_JNDI_NAME; + + final List<ClientInstruction> + publisherInstructions = new ClientInstructionBuilder().configureDestinations(_defaultDestinations) + .publishMessage(destinationJndiName) + .withReplyToJndiName( + temporaryQueueJndiName) + .withHeader(MessageDescription.MessageHeader.CORRELATION_ID, + correlationId) + .build(); + final List<ClientInstruction> subscriberInstructions = new ClientInstructionBuilder().configureDestinations(_defaultDestinations) + .receiveMessage(destinationJndiName) + .withHeader(MessageDescription.MessageHeader.CORRELATION_ID, + correlationId) + .build(); + performTest(publisherInstructions, subscriberInstructions); + } + + public void performSimpleTest(final MessageDescription messageDescription) throws Exception + { + final String destinationJndiName = QUEUE_JNDI_NAME; + final List<ClientInstruction> publisherInstructions = + new ClientInstructionBuilder().configureDestinations(_defaultDestinations) + .publishMessage(destinationJndiName, messageDescription) + .build(); + final List<ClientInstruction> subscriberInstructions = + new ClientInstructionBuilder().configureDestinations(_defaultDestinations) + .receiveMessage(destinationJndiName, messageDescription) + .build(); + performTest(publisherInstructions,subscriberInstructions); + } + + public void performTest(final List<ClientInstruction> publisherInstructions, + final List<ClientInstruction> subscriberInstructions) throws Exception { - final ListenableFuture<?> publisherFuture = - runPublisher(JmsInstructionBuilder.publishSingleMessage(messageDescription)); - final ListenableFuture<?> subscriberFuture = - runSubscriber(JmsInstructionBuilder.receiveSingleMessage(messageDescription)); + final ListenableFuture<?> publisherFuture = runPublisher(publisherInstructions); + final ListenableFuture<?> subscriberFuture = runSubscriber(subscriberInstructions); try { Futures.allAsList(publisherFuture, subscriberFuture).get(TEST_TIMEOUT, TimeUnit.MILLISECONDS); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org