Here are my versions of these test files:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tomcat.websocket.server;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;

import javax.websocket.ClientEndpointConfig;
import javax.websocket.ContainerProvider;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;

import org.apache.catalina.Context;
import org.apache.catalina.servlets.DefaultServlet;
import org.apache.catalina.startup.Tomcat;
import org.apache.catalina.startup.TomcatBaseTest;
import org.apache.tomcat.websocket.TesterAsyncTiming;
import 
org.apache.tomcat.websocket.TesterMessageCountClient.TesterProgrammaticEndpoint;
import org.junit.Assert;
import org.junit.Test;

//@Ignore // Test passes but GC delays can introduce false failures.
public class TestAsyncMessages extends TomcatBaseTest {

    @Test
    public void testAsyncTiming() throws Exception {

        Tomcat tomcat = getTomcatInstance();
        // No file system docBase required
        Context ctx = tomcat.addContext("", null);
        ctx.addApplicationListener(TesterAsyncTiming.Config.class.getName());
        DefaultServlet defaultServlet = new DefaultServlet();
        WebSocketContainer wsContainer =
                ContainerProvider.getWebSocketContainer();

        Tomcat.addServlet(ctx, "default", defaultServlet);
        ctx.addServletMappingDecoded("/", "default");

        ctx.addParameter(Constants.
                BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM, "" + 
TesterAsyncTiming.Config.LARGE_DATA_SIZE);
        ctx.addParameter(org.apache.tomcat.websocket.server.Constants.
                TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM, "" + 
TesterAsyncTiming.Config.LARGE_DATA_SIZE);
        
wsContainer.setDefaultMaxBinaryMessageBufferSize(TesterAsyncTiming.Config.LARGE_DATA_SIZE);
        
wsContainer.setDefaultMaxTextMessageBufferSize(TesterAsyncTiming.Config.LARGE_DATA_SIZE);

        tomcat.start();

        ClientEndpointConfig clientEndpointConfig = 
ClientEndpointConfig.Builder.create().build();
        Session wsSession = wsContainer.connectToServer(
                TesterProgrammaticEndpoint.class,
                clientEndpointConfig,
                new URI("ws://localhost:" + getPort() + 
TesterAsyncTiming.Config.PATH));

        AsyncTimingClientHandler handler = new AsyncTimingClientHandler();
        wsSession.addMessageHandler(ByteBuffer.class, handler);
        wsSession.getBasicRemote().sendText("Hello");

        System.out.println("Sent Hello message, waiting for data");
        handler.waitForLatch();
        Assert.assertFalse(handler.hasFailed());
    }

    private static class AsyncTimingClientHandler implements 
MessageHandler.Partial<ByteBuffer> {

        private long lastMessage = 0;
        private int sequence = 0;
        private int count = 0;
        private CountDownLatch latch = new CountDownLatch(1);
        private volatile boolean fail = false;
        private long minDelayLong = TesterAsyncTiming.Config.SLEEP_MILLI - 20;
        private long maxDelayLong = TesterAsyncTiming.Config.SLEEP_MILLI + 20;
        private long maxDelayShort = 20;

        @Override
        public void onMessage(ByteBuffer message, boolean last) {
            if (lastMessage == 0) {
                // First message. Don't check
                sequence ++;
                lastMessage = System.currentTimeMillis();
            } else {
                long newTime = System.currentTimeMillis();
                long diff = newTime - lastMessage;
                lastMessage = newTime;
                if (sequence == 0) {
                    sequence++;
                    if (message.capacity() != 
TesterAsyncTiming.Config.LARGE_DATA_SIZE) {
                        System.out.println(
                                "Expected size " + 
TesterAsyncTiming.Config.LARGE_DATA_SIZE + " but was [" + message
                                        .capacity() + "], count [" + count + 
"]");
                        fail = true;
                    }
                    if (diff < minDelayLong) {
                        System.out.println(
                                "Expected diff > " + minDelayLong + " ms but 
was [" + diff + "], count [" + count
                                        + "]");
                        fail = true;
                    } else if (diff > maxDelayLong) {
                        System.out.println(
                                "Expected diff < " + maxDelayLong + " ms but 
was [" + diff + "], count [" + count
                                        + "]");
                        fail = true;
                    }
                } else if (sequence == 1) {
                    sequence = 0;
                    if (message.capacity() != 
TesterAsyncTiming.Config.SMALL_DATA_SIZE) {
                        System.out.println(
                                "Expected size " + 
TesterAsyncTiming.Config.SMALL_DATA_SIZE + " but was [" + message
                                        .capacity() + "], count [" + count + 
"]");
                        fail = true;
                    }
                    if (diff > maxDelayShort) {
                        System.out.println(
                                "Expected diff < " + maxDelayShort + " ms but 
was [" + diff + "], count [" + count
                                        + "]");
                        fail = true;
                    }
                }
            }
            count ++;
            if (count >= TesterAsyncTiming.Config.ITERATIONS * 2) {
                latch.countDown();
            }
        }

        public void waitForLatch() throws InterruptedException {
            latch.await();
        }

        public boolean hasFailed() {
            return fail;
        }
    }
}

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.tomcat.websocket;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.Semaphore;

import javax.websocket.OnMessage;
import javax.websocket.RemoteEndpoint.Async;
import javax.websocket.SendHandler;
import javax.websocket.SendResult;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import org.apache.tomcat.websocket.server.TesterEndpointConfig;

public class TesterAsyncTiming {

    public static class Config extends TesterEndpointConfig {

        public static final String PATH = "/timing";
        public static final int ITERATIONS = 100;
        public static final int SLEEP_MILLI = 100;
        public static final int LARGE_DATA_SIZE = 16 * 1024;
        public static final int SMALL_DATA_SIZE = 1904;

        @Override
        protected Class<?> getEndpointClass() {
            return Endpoint.class;
        }
    }

    @ServerEndpoint(Config.PATH)
    public static class Endpoint {

        private static final ByteBuffer LARGE_DATA= 
ByteBuffer.allocate(Config.LARGE_DATA_SIZE);
        private static final ByteBuffer SMALL_DATA= 
ByteBuffer.allocate(Config.SMALL_DATA_SIZE);

        @OnMessage
        public void onMessage(Session session, @SuppressWarnings("unused") 
String text) {

            Semaphore semaphore = new Semaphore(1);
            SendHandler handler = new SemaphoreSendHandler(semaphore);

            Async remote = session.getAsyncRemote();
            int i = 0;
            while (true) {
                try {
                    semaphore.acquire(1);
                    remote.sendBinary(LARGE_DATA, handler);
                    semaphore.acquire(1);
                    remote.sendBinary(SMALL_DATA, handler);
                    if (i >= Config.ITERATIONS - 1)
                        break;
                    Thread.sleep(Config.SLEEP_MILLI);
                    LARGE_DATA.flip();
                    SMALL_DATA.flip();
                    i++;
                } catch (InterruptedException e) {
                    throw new IllegalStateException(e);
                }
            }
        }

        private class SemaphoreSendHandler implements SendHandler {

            private final Semaphore semaphore;

            private SemaphoreSendHandler(Semaphore semaphore) {
                this.semaphore = semaphore;
            }

            @Override
            public void onResult(SendResult result) {
                semaphore.release();
            }
        }
    }
}

-Harri

-----Original Message-----
From: Pesonen, Harri [mailto:harri.peso...@sap.com] 
Sent: 8. maaliskuuta 2017 16:32
To: Tomcat Users List <users@tomcat.apache.org>
Subject: RE: Tomcat WebSocket does not always send asynchronous messages

Hello, and sorry for top-posting, I don't know how to configure Outlook to do 
it differently.

I was finally able to run your test. I had a lot of trouble doing it:
* did not have SVN, downloaded TortoiseSVN
* tried to open the project in IDEA, but failed miserably, I really hope that 
there was pom.xml
* was able to build whole Tomcat and test using ant command line, but it took 
so long, had to abort
* was not able to run this single test with ant:

Testsuite: org.apache.tomcat.websocket.server.TestAsyncMessages.java
Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 0 sec

        Caused an ERROR
org.apache.tomcat.websocket.server.TestAsyncMessages.java
java.lang.ClassNotFoundException: 
org.apache.tomcat.websocket.server.TestAsyncMessages.java

* but was able to make the Eclipse project with "ant ide-eclipse"
* was able to run the unit test in Eclipse:

08-Mar-2017 14:14:40.538 INFO [main] 
org.apache.catalina.startup.LoggingBaseTest.setUp Starting test case 
[testAsyncTiming]
08-Mar-2017 14:14:42.676 INFO [main] org.apache.coyote.AbstractProtocol.init 
Initializing ProtocolHandler ["http-nio-127.0.0.1-auto-1"]
08-Mar-2017 14:14:42.778 INFO [main] 
org.apache.tomcat.util.net.NioSelectorPool.getSharedSelector Using a shared 
selector for servlet write/read
08-Mar-2017 14:14:42.800 INFO [main] 
org.apache.catalina.core.StandardService.startInternal Starting service Tomcat
08-Mar-2017 14:14:42.802 INFO [main] 
org.apache.catalina.core.StandardEngine.startInternal Starting Servlet Engine: 
Apache Tomcat/@VERSION@
08-Mar-2017 14:14:43.213 INFO [main] org.apache.coyote.AbstractProtocol.start 
Starting ProtocolHandler [http-nio-127.0.0.1-auto-1-54783]
Sent Hello message, waiting for data
Expected diff < 500,000 but was [6054390], count [2]
Expected diff < 500,000 but was [1015710], count [14]
Expected diff < 500,000 but was [642270], count [25]
Expected diff < 500,000 but was [1712852], count [26]
Expected diff < 500,000 but was [595293], count [41]
Expected diff < 500,000 but was [792673], count [61]
Expected diff < 500,000 but was [799777], count [62]
Expected diff < 500,000 but was [531738], count [68]
Expected diff < 500,000 but was [532922], count [76]
Expected diff < 500,000 but was [673851], count [98]
Expected diff < 500,000 but was [538054], count [133]
Expected diff < 500,000 but was [747276], count [158]
Expected diff < 500,000 but was [794646], count [262]
Expected diff < 500,000 but was [1290461], count [263]
Expected diff < 500,000 but was [1013341], count [296]
Expected diff < 500,000 but was [582267], count [311]
Expected diff < 500,000 but was [1377703], count [337]
Expected diff < 500,000 but was [1698245], count [338]
Expected diff < 500,000 but was [1303488], count [424]
Expected diff < 500,000 but was [965181], count [425]
Expected diff < 500,000 but was [534896], count [455]
Expected diff < 500,000 but was [847938], count [458]
Expected diff < 500,000 but was [883862], count [473]
Expected diff < 500,000 but was [1026368], count [475]
Expected diff < 500,000 but was [1096241], count [476]
Expected diff < 500,000 but was [518710], count [481]
Expected diff < 500,000 but was [1053607], count [482]
Expected diff < 500,000 but was [641481], count [500]
Expected diff < 500,000 but was [565292], count [512]
Expected diff < 500,000 but was [808857], count [556]
Expected diff < 500,000 but was [643455], count [653]
Expected diff < 500,000 but was [508447], count [670]
Expected diff < 500,000 but was [960839], count [671]
Expected diff < 500,000 but was [954918], count [683]
Expected diff < 500,000 but was [601215], count [749]
Expected diff < 500,000 but was [561345], count [752]
Expected diff < 500,000 but was [688062], count [935]
Expected diff < 500,000 but was [1405730], count [937]
Expected diff < 500,000 but was [1414415], count [938]
Expected diff < 500,000 but was [1284935], count [941]
Expected diff < 500,000 but was [516737], count [995]
Expected diff < 500,000 but was [587398], count [1067]
Expected diff < 500,000 but was [946233], count [1079]
Expected diff < 500,000 but was [5403041], count [1114]
Expected diff < 500,000 but was [1181114], count [1115]
Expected diff < 500,000 but was [554239], count [1118]
Expected diff < 500,000 but was [1437706], count [1121]
Expected diff < 500,000 but was [577925], count [1240]
Expected diff < 500,000 but was [1226115], count [1241]
Expected diff < 500,000 but was [2194850], count [1285]
Expected diff < 500,000 but was [522264], count [1292]
Expected diff < 500,000 but was [845964], count [1328]
Expected diff < 500,000 but was [3652294], count [1331]
Expected diff < 500,000 but was [727538], count [1343]
Expected diff < 500,000 but was [809252], count [1349]
Expected diff < 500,000 but was [1597188], count [1393]
Expected diff < 500,000 but was [525816], count [1394]
08-Mar-2017 14:15:09.251 INFO [main] org.apache.coyote.AbstractProtocol.pause 
Pausing ProtocolHandler ["http-nio-127.0.0.1-auto-1-54783"]
08-Mar-2017 14:15:09.266 INFO [main] 
org.apache.catalina.core.StandardService.stopInternal Stopping service Tomcat

Then I was able to run the test also in IDEA, by importing Eclipse project and 
modifying the libraries.

I changed the big message size to 16384 bytes and small size to 1904 bytes (the 
most common problem that we have seen).
So need to add the following to servlet configuration:

        <context-param>
                
<param-name>org.apache.tomcat.websocket.binaryBufferSize</param-name>
                <param-value>16384</param-value>
        </context-param>
        <context-param>
                
<param-name>org.apache.tomcat.websocket.textBufferSize</param-name>
                <param-value>16384</param-value>
        </context-param>

Like this:

        ctx.addParameter(Constants.
                BINARY_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM, "16384");
        ctx.addParameter(org.apache.tomcat.websocket.server.Constants.
                TEXT_BUFFER_SIZE_SERVLET_CONTEXT_INIT_PARAM, "16384");
        wsContainer.setDefaultMaxBinaryMessageBufferSize(16384);
        wsContainer.setDefaultMaxTextMessageBufferSize(16384);

Then I changed the test logic so that there are only 2 messages, one big and 
one small, but it fails randomly like the original test.
Also added test for maximum delay for the big message:

                    } else if (diff > 60000000) {
                        System.out.println("Expected diff < 60ms but was [" + 
diff + "], count [" + count + "]");
                        fail = true;
                    }

Sent Hello message, waiting for data
Expected diff < 500,000 but was [718458], count [9]
Expected diff < 500,000 but was [1224142], count [17]
Expected diff < 500,000 but was [1102952], count [19]
Expected diff < 500,000 but was [663587], count [21]
Expected diff < 500,000 but was [537659], count [43]
Expected diff < 500,000 but was [3158452], count [73]
Expected diff < 500,000 but was [582662], count [75]
Expected diff < 500,000 but was [825437], count [121]
Expected diff < 500,000 but was [605953], count [129]
Expected diff < 500,000 but was [1092293], count [143]
Expected diff < 500,000 but was [552660], count [145]
Expected diff < 500,000 but was [7926329], count [177]
Expected diff < 500,000 but was [507657], count [189]
Expected diff < 500,000 but was [913468], count [193]
Expected diff < 500,000 but was [560160], count [229]
Expected diff < 500,000 but was [1748774], count [319]
Expected diff < 500,000 but was [529764], count [321]
Expected diff < 500,000 but was [692009], count [381]
Expected diff < 500,000 but was [556213], count [389]
Expected diff < 500,000 but was [2727772], count [403]
Expected diff < 500,000 but was [937154], count [469]
Expected diff < 500,000 but was [544370], count [513]
Expected diff < 500,000 but was [1018473], count [575]
Expected diff < 500,000 but was [518711], count [601]
Expected diff < 500,000 but was [885441], count [613]
Expected diff < 500,000 but was [2075633], count [645]
Expected diff < 500,000 but was [606347], count [715]
Expected diff < 500,000 but was [561345], count [721]
Expected diff < 500,000 but was [947812], count [889]
Expected diff < 500,000 but was [513974], count [923]
Expected diff < 60ms but was [65475211], count [960]
Expected diff > 40ms but was [35346555], count [962]

So the delay is at most 2 milliseconds, which is acceptable.

I think that you understood my problem quite well. It seems that the problem 
can't be reproduced in this test.
I also tried adding sendPing() there but it did not have any effect.
We do not use compression extension.
Thanks for the test! :-)

-Harri

-----Original Message-----
From: Mark Thomas [mailto:ma...@apache.org] 
Sent: 7. maaliskuuta 2017 23:58
To: Tomcat Users List <users@tomcat.apache.org>
Subject: Re: Tomcat WebSocket does not always send asynchronous messages

On 07/03/17 14:55, Mark Thomas wrote:
> On 07/03/17 11:03, Mark Thomas wrote:
>> On 07/03/17 08:28, Pesonen, Harri wrote:
>>> Hello, we have a problem that Tomcat WebSocket does not always send 
>>> asynchronous messages. This problem is random, and it has been reproduced 
>>> in Tomcat 8.5.6 and 8.5.11. Synchronized operations work fine, and also the 
>>> asynchronous operations work except in one special case. When there is a 
>>> big message that we want to send to client, we split it into 16 kB packets 
>>> for technical reasons, and then we send them very quickly after each other 
>>> using
>>>
>>> /**
>>> * Initiates the asynchronous transmission of a binary message. This method 
>>> returns before the message
>>> * is transmitted. Developers provide a callback to be notified when the 
>>> message has been
>>> * transmitted. Errors in transmission are given to the developer in the 
>>> SendResult object.
>>> *
>>> * @param data       the data being sent, must not be {@code null}.
>>> * @param handler the handler that will be notified of progress, must not be 
>>> {@code null}.
>>> * @throws IllegalArgumentException if either the data or the handler are 
>>> {@code null}.
>>> */
>>> void sendBinary(ByteBuffer data, SendHandler handler);
>>>
>>> Because there can be only one ongoing write to socket, we use Semaphore 
>>> that is released on the SendHandler callback:
>>>
>>> public void onResult(javax.websocket.SendResult result) {
>>>     semaphore.release();
>>>
>>> So the code to send is actually:
>>>
>>> semaphore.acquireUninterruptibly();
>>> async.sendBinary(buf, asyncHandler);
>>>
>>> This works fine in most cases. But when we send one 16 kB packet and then 
>>> immediately one smaller packet (4 kB), then randomly the smaller packet is 
>>> not actually sent, but only after we call
>>>
>>> async.sendPing(new byte[0])
>>>
>>> in another thread. sendPing() is called every 20 seconds to keep the 
>>> WebSocket connection alive. This means that the last packet gets extra 
>>> delay on client, which varies between 0 - 20 seconds.
>>>
>>> We have an easy workaround to the problem. If we call flushBatch() after 
>>> each sendBinary(), then it works great, but this means that the sending is 
>>> not actually asynchronous, because flushBatch() is synchronous.
>>> Also we should not be forced to call flushBatch(), because we are not 
>>> enabling batching. Instead we make sure that it is disabled:
>>>
>>> if (async.getBatchingAllowed()) {
>>>     async.setBatchingAllowed(false);
>>>
>>> So the working code is:
>>>
>>> semaphore.acquireUninterruptibly();
>>> async.sendBinary(buf, asyncHandler);
>>> async.flushBatch();
>>>
>>> Normally the code works fine without flushBatch(), if there is delay 
>>> between the messages, but when we send the messages right after each other, 
>>> then the last small message is not always sent immediately.
>>> I looked at the Apache WebSocket code, but it was not clear to me what is 
>>> happening there.
>>> Any ideas what is going on here? Any ideas how I could troubleshoot this 
>>> more?
>>
>> Thanks for providing such a clear description of the problem you are seeing.
>>
>> It sounds like there is a race condition somewhere in the WebSocket
>> code. With the detail you have provided, I think there is a reasonable
>> chance of finding via code inspection.
> 
> Some follow-up questions to help narrow the search.
> 
> This is server side, correct?
> 
> Are you using the compression extension? If yes, do you see the problem
> without it?
> 
> When you say "we split it into 16 kB packets" do you mean you split it
> into multiple WebSocket messages?
> 
> If you insert a short delay before sending the final 4kB does that
> reduce the frequency of the problem?

I've added a (disabled by default) test case to explore the issue
described based on my understanding. It passes for me (ignoring what
look like GC introduced delays) with NIO.

http://svn.apache.org/viewvc?rev=1785893&view=rev

What would be really helpful would be if you could use this as a basis
for providing a test case that demonstrates the problem you are seeing.

Thanks,

Mark

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@tomcat.apache.org
For additional commands, e-mail: users-h...@tomcat.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@tomcat.apache.org
For additional commands, e-mail: users-h...@tomcat.apache.org

Reply via email to