Repository: qpid-broker-j
Updated Branches:
  refs/heads/master d12b40a4d -> dbaff6090


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/utils/LoggingOutputStream.java
----------------------------------------------------------------------
diff --git 
a/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/utils/LoggingOutputStream.java
 
b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/utils/LoggingOutputStream.java
new file mode 100644
index 0000000..d6256cf
--- /dev/null
+++ 
b/systests/end-to-end-conversion-tests/src/main/java/org/apache/qpid/systests/end_to_end_conversion/utils/LoggingOutputStream.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.systests.end_to_end_conversion.utils;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+
+/*
+ * Original code by Jim Moore
+ * See: https://www.mail-archive.com/user@slf4j.org/msg00673.html
+ * Adapted for Qpid needs.
+ */
+
+/**
+ * An OutputStream that flushes out to a Category.<p>
+ * <p/>
+ * Note that no data is written out to the Category until the stream is
+ * flushed or closed.<p>
+ * <p/>
+ * Example:<pre>
+ * // make sure everything sent to System.err is logged
+ * System.setErr(new PrintStream(new
+ * LoggingOutputStream(Logger.getRootCategory(),
+ * Level.WARN), true));
+ * <p/>
+ * // make sure everything sent to System.out is also logged
+ * System.setOut(new PrintStream(new
+ * LoggingOutputStream(Logger.getRootCategory(),
+ * Level.INFO), true));
+ * </pre>
+ *
+ * @author <a href="[EMAIL PROTECTED]">Jim Moore</a>
+ */
+
+//
+public class LoggingOutputStream extends OutputStream
+{
+    /**
+     * Platform dependant line separator
+     */
+    private static final byte[] LINE_SEPARATOR_BYTES = 
System.getProperty("line.separator").getBytes();
+    /**
+     * The default number of bytes in the buffer. =2048
+     */
+    private static final int DEFAULT_BUFFER_LENGTH = 2048;
+    /**
+     * Used to maintain the contract of [EMAIL PROTECTED] #close()}.
+     */
+    private boolean hasBeenClosed = false;
+    /**
+     * The internal buffer where data is stored.
+     */
+    private byte[] buf;
+    /**
+     * The number of valid bytes in the buffer. This value is always
+     * in the range <tt>0</tt> through <tt>buf.length</tt>; elements
+     * <tt>buf[0]</tt> through <tt>buf[count-1]</tt> contain valid
+     * byte data.
+     */
+    private int count;
+    /**
+     * Remembers the size of the buffer for speed.
+     */
+    private int bufLength;
+    /**
+     * The category to write to.
+     */
+    private Logger logger;
+
+    /**
+     * The priority to use when writing to the Category.
+     */
+    private Level level;
+
+    /**
+     * Creates the LoggingOutputStream to flush to the given Category.
+     *
+     * @param log   the Logger to write to
+     * @param level the Level to use when writing to the Logger
+     * @throws IllegalArgumentException if cat == null or priority ==
+     *                                  null
+     */
+    public LoggingOutputStream(Logger log, Level level) throws 
IllegalArgumentException
+    {
+        if (log == null)
+        {
+            throw new IllegalArgumentException("cat == null");
+        }
+        if (level == null)
+        {
+            throw new IllegalArgumentException("priority == null");
+        }
+
+        this.level = level;
+
+        logger = log;
+        bufLength = DEFAULT_BUFFER_LENGTH;
+        buf = new byte[DEFAULT_BUFFER_LENGTH];
+        count = 0;
+    }
+
+
+    /**
+     * Closes this output stream and releases any system resources
+     * associated with this stream. The general contract of
+     * <code>close</code>
+     * is that it closes the output stream. A closed stream cannot
+     * perform
+     * output operations and cannot be reopened.
+     */
+    public void close()
+    {
+        flush();
+        hasBeenClosed = true;
+    }
+
+
+    /**
+     * Writes the specified byte to this output stream. The general
+     * contract for <code>write</code> is that one byte is written
+     * to the output stream. The byte to be written is the eight
+     * low-order bits of the argument <code>b</code>. The 24
+     * high-order bits of <code>b</code> are ignored.
+     *
+     * @param b the <code>byte</code> to write
+     * @throws java.io.IOException if an I/O error occurs. In particular, an
+     *                             <code>IOException</code> may be
+     *                             thrown if the output stream has been closed.
+     */
+    public void write(final int b) throws IOException
+    {
+        if (hasBeenClosed)
+        {
+            throw new IOException("The stream has been closed.");
+        }
+
+        // would this be writing past the buffer?
+
+        if (count == bufLength)
+        {
+            // grow the buffer
+            final int newBufLength = bufLength + DEFAULT_BUFFER_LENGTH;
+            final byte[] newBuf = new byte[newBufLength];
+
+            System.arraycopy(buf, 0, newBuf, 0, bufLength);
+            buf = newBuf;
+
+            bufLength = newBufLength;
+        }
+
+        buf[count] = (byte) b;
+
+        count++;
+
+        if (endsWithNewLine())
+        {
+            flush();
+        }
+    }
+
+    private boolean endsWithNewLine()
+    {
+        if (count >= LINE_SEPARATOR_BYTES.length)
+        {
+            for (int i = 0; i < LINE_SEPARATOR_BYTES.length; i++)
+            {
+                if (buf[count - LINE_SEPARATOR_BYTES.length + i] != 
LINE_SEPARATOR_BYTES[i])
+                {
+                    return false;
+                }
+            }
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
+
+    /**
+     * Flushes this output stream and forces any buffered output bytes
+     * to be written out. The general contract of <code>flush</code> is
+     * that calling it is an indication that, if any bytes previously
+     * written have been buffered by the implementation of the output
+     * stream, such bytes should immediately be written to their
+     * intended destination.
+     */
+    public void flush()
+    {
+
+        if (count == 0)
+        {
+            return;
+        }
+
+        // don't print out blank lines; flushing from PrintStream puts
+
+        // For linux system
+
+        if (count == 1 && ((char) buf[0]) == '\n')
+        {
+            reset();
+            return;
+        }
+
+        // For mac system
+
+        if (count == 1 && ((char) buf[0]) == '\r')
+        {
+            reset();
+            return;
+        }
+
+        // On windows system
+
+        if (count == 2 && (char) buf[0] == '\r' && (char) buf[1] == '\n')
+        {
+            reset();
+            return;
+        }
+
+        while (endsWithNewLine())
+        {
+            count -= LINE_SEPARATOR_BYTES.length;
+        }
+        final byte[] theBytes = new byte[count];
+        System.arraycopy(buf, 0, theBytes, 0, count);
+        final String message = new String(theBytes);
+        switch (level)
+        {
+            case ERROR:
+                logger.error(message);
+                break;
+            case WARN:
+                logger.warn(message);
+                break;
+            case INFO:
+                logger.info(message);
+                break;
+            case DEBUG:
+                logger.debug(message);
+                break;
+            case TRACE:
+                logger.trace(message);
+                break;
+        }
+        reset();
+    }
+
+    private void reset()
+    {
+        // not resetting the buffer -- assuming that if it grew then it will 
likely grow similarly again
+        count = 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0822e1a8/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
----------------------------------------------------------------------
diff --git 
a/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
 
b/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
index 79812f0..a4f4f76 100644
--- 
a/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
+++ 
b/systests/end-to-end-conversion-tests/src/test/java/org/apache/qpid/systests/end_to_end_conversion/SimpleConversionTest.java
@@ -20,29 +20,57 @@
 
 package org.apache.qpid.systests.end_to_end_conversion;
 
-import static org.junit.Assert.fail;
+import static org.junit.Assume.assumeTrue;
 
+import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import org.junit.Ignore;
+import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.systests.end_to_end_conversion.client.ClientInstruction;
+import 
org.apache.qpid.systests.end_to_end_conversion.client.MessageDescription;
 import 
org.apache.qpid.systests.end_to_end_conversion.client.VerificationException;
 
 public class SimpleConversionTest extends EndToEndConversionTestBase
 {
     private static final long TEST_TIMEOUT = 30000L;
+    public static final String QUEUE_NAME = "testQueue";
+    public static final String REPLY_QUEUE_NAME = "testReplyQueue";
+    private static final String QUEUE_JNDI_NAME = "queue";
+    private static final String REPLY_QUEUE_JNDI_NAME = "replyQueue";
+
+
+    private HashMap<String, String> _defaultDestinations;
+
+    @Before
+    public void setup()
+    {
+        getBrokerAdmin().createQueue(QUEUE_NAME);
+        getBrokerAdmin().createQueue(REPLY_QUEUE_NAME);
+
+        _defaultDestinations = new HashMap<>();
+        _defaultDestinations.put("queue." + QUEUE_JNDI_NAME, QUEUE_NAME);
+        _defaultDestinations.put("queue." + REPLY_QUEUE_JNDI_NAME, 
REPLY_QUEUE_NAME);
+/*
+        destinations.put("topic.topic", "testTopic");
+        destinations.put("topic.replyTopic", "testReplyTopic");
+        destinations.put("destination.destination", "testDestination");
+        destinations.put("destination.replyDestination", 
"testReplyDestination");
+*/
+    }
 
     @Test
     public void textMessage() throws Exception
     {
-        final JmsInstructions.MessageDescription messageDescription = new 
JmsInstructions.MessageDescription();
-        
messageDescription.setMessageType(JmsInstructions.MessageDescription.MessageType.TEXT_MESSAGE);
+        final MessageDescription messageDescription = new MessageDescription();
+        
messageDescription.setMessageType(MessageDescription.MessageType.TEXT_MESSAGE);
         messageDescription.setContent("foobar");
 
         performSimpleTest(messageDescription);
@@ -51,8 +79,8 @@ public class SimpleConversionTest extends 
EndToEndConversionTestBase
     @Test
     public void bytesMessage() throws Exception
     {
-        final JmsInstructions.MessageDescription messageDescription = new 
JmsInstructions.MessageDescription();
-        
messageDescription.setMessageType(JmsInstructions.MessageDescription.MessageType.BYTES_MESSAGE);
+        final MessageDescription messageDescription = new MessageDescription();
+        
messageDescription.setMessageType(MessageDescription.MessageType.BYTES_MESSAGE);
         messageDescription.setContent(new byte[]{0x00, (byte) 0xFF, (byte) 
0xc3});
 
         performSimpleTest(messageDescription);
@@ -61,8 +89,8 @@ public class SimpleConversionTest extends 
EndToEndConversionTestBase
     @Test
     public void mapMessage() throws Exception
     {
-        final JmsInstructions.MessageDescription messageDescription = new 
JmsInstructions.MessageDescription();
-        
messageDescription.setMessageType(JmsInstructions.MessageDescription.MessageType.MAP_MESSAGE);
+        final MessageDescription messageDescription = new MessageDescription();
+        
messageDescription.setMessageType(MessageDescription.MessageType.MAP_MESSAGE);
         HashMap<String, Object> content = new HashMap<>();
         content.put("int", 42);
         content.put("boolean", true);
@@ -73,22 +101,36 @@ public class SimpleConversionTest extends 
EndToEndConversionTestBase
     }
 
     @Test
+    public void type() throws Exception
+    {
+        final String type = "testType";
+        final MessageDescription messageDescription = new MessageDescription();
+        messageDescription.setHeader(MessageDescription.MessageHeader.TYPE, 
type);
+
+        performSimpleTest(messageDescription);
+    }
+
+    @Test
     public void correlationId() throws Exception
     {
         final String correlationId = "myCorrelationId";
-        final JmsInstructions.MessageDescription messageDescription = new 
JmsInstructions.MessageDescription();
-        
messageDescription.setHeader(JmsInstructions.MessageDescription.MessageHeader.CORRELATION_ID,
 correlationId);
+        final MessageDescription messageDescription = new MessageDescription();
+        
messageDescription.setHeader(MessageDescription.MessageHeader.CORRELATION_ID, 
correlationId);
 
         performSimpleTest(messageDescription);
     }
 
-    @Ignore("QPID-7897")
     @Test
     public void correlationIdAsBytes() throws Exception
     {
+        assumeTrue("This test is known to fail for pre 0-10 subscribers 
(QPID-7897)",
+                   EnumSet.of(Protocol.AMQP_0_10, 
Protocol.AMQP_1_0).contains(getSubscriberProtocolVersion()));
+        assumeTrue("This test is known to fail for pre 0-10 subscribers 
(QPID-7899)",
+                   EnumSet.of(Protocol.AMQP_0_10, 
Protocol.AMQP_1_0).contains(getPublisherProtocolVersion()));
+
         final byte[] correlationId = new byte[]{(byte) 0xFF, 0x00, (byte) 
0xC3};
-        final JmsInstructions.MessageDescription messageDescription = new 
JmsInstructions.MessageDescription();
-        
messageDescription.setHeader(JmsInstructions.MessageDescription.MessageHeader.CORRELATION_ID,
 correlationId);
+        final MessageDescription messageDescription = new MessageDescription();
+        
messageDescription.setHeader(MessageDescription.MessageHeader.CORRELATION_ID, 
correlationId);
 
         performSimpleTest(messageDescription);
     }
@@ -96,7 +138,7 @@ public class SimpleConversionTest extends 
EndToEndConversionTestBase
     @Test
     public void property() throws Exception
     {
-        final JmsInstructions.MessageDescription messageDescription = new 
JmsInstructions.MessageDescription();
+        final MessageDescription messageDescription = new MessageDescription();
         messageDescription.setProperty("intProperty", 42);
         messageDescription.setProperty("stringProperty", "foobar");
         messageDescription.setProperty("booleanProperty", true);
@@ -105,12 +147,61 @@ public class SimpleConversionTest extends 
EndToEndConversionTestBase
         performSimpleTest(messageDescription);
     }
 
-    public void performSimpleTest(final JmsInstructions.MessageDescription 
messageDescription) throws Exception
+    @Test
+    public void replyTo() throws Exception
+    {
+        performReplyToTest(REPLY_QUEUE_JNDI_NAME);
+    }
+
+    @Test
+    public void replyToTemporaryQueue() throws Exception
+    {
+        performReplyToTest(TEMPORARY_QUEUE_JNDI_NAME);
+    }
+
+    public void performReplyToTest(final String temporaryQueueJndiName) throws 
Exception
+    {
+        assumeTrue("This test is known to fail for pre 0-10 subscribers 
(QPID-7898)",
+                   EnumSet.of(Protocol.AMQP_0_10, 
Protocol.AMQP_1_0).contains(getSubscriberProtocolVersion()));
+
+        final String correlationId = "testCorrelationId";
+        final String destinationJndiName = QUEUE_JNDI_NAME;
+
+        final List<ClientInstruction>
+                publisherInstructions = new 
ClientInstructionBuilder().configureDestinations(_defaultDestinations)
+                                                                      
.publishMessage(destinationJndiName)
+                                                                      
.withReplyToJndiName(
+                                                                               
                     temporaryQueueJndiName)
+                                                                      
.withHeader(MessageDescription.MessageHeader.CORRELATION_ID,
+                                                                               
   correlationId)
+                                                                      .build();
+        final List<ClientInstruction> subscriberInstructions = new 
ClientInstructionBuilder().configureDestinations(_defaultDestinations)
+                                                                               
              .receiveMessage(destinationJndiName)
+                                                                               
              .withHeader(MessageDescription.MessageHeader.CORRELATION_ID,
+                                                                               
                          correlationId)
+                                                                               
              .build();
+        performTest(publisherInstructions, subscriberInstructions);
+    }
+
+    public void performSimpleTest(final MessageDescription messageDescription) 
throws Exception
+    {
+        final String destinationJndiName = QUEUE_JNDI_NAME;
+        final List<ClientInstruction> publisherInstructions =
+                new 
ClientInstructionBuilder().configureDestinations(_defaultDestinations)
+                                              
.publishMessage(destinationJndiName, messageDescription)
+                                              .build();
+        final List<ClientInstruction> subscriberInstructions =
+                new 
ClientInstructionBuilder().configureDestinations(_defaultDestinations)
+                                              
.receiveMessage(destinationJndiName, messageDescription)
+                                              .build();
+        performTest(publisherInstructions,subscriberInstructions);
+    }
+
+    public void performTest(final List<ClientInstruction> 
publisherInstructions,
+                            final List<ClientInstruction> 
subscriberInstructions) throws Exception
     {
-        final ListenableFuture<?> publisherFuture =
-                
runPublisher(JmsInstructionBuilder.publishSingleMessage(messageDescription));
-        final ListenableFuture<?> subscriberFuture =
-                
runSubscriber(JmsInstructionBuilder.receiveSingleMessage(messageDescription));
+        final ListenableFuture<?> publisherFuture = 
runPublisher(publisherInstructions);
+        final ListenableFuture<?> subscriberFuture = 
runSubscriber(subscriberInstructions);
         try
         {
             Futures.allAsList(publisherFuture, 
subscriberFuture).get(TEST_TIMEOUT, TimeUnit.MILLISECONDS);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to