This is an automated email from the ASF dual-hosted git repository. tabish pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-protonj2.git
The following commit(s) were added to refs/heads/main by this push: new b36e825b PROTON-2637 Clean up performative writers and cache type encoders b36e825b is described below commit b36e825b3b2bc1f294c56b1de4c44200cf4127e6 Author: Timothy Bish <tabish...@gmail.com> AuthorDate: Tue Nov 1 13:09:54 2022 -0400 PROTON-2637 Clean up performative writers and cache type encoders Cleans up some of the code in the encoders for AMQP performatives and create a cache for the performative type to avoid each write needing to lookup the TypeEncoder for a performative since those are fixed once the engine has started. --- .../qpid/protonj2/codec/PerformativeEncoder.java | 157 +++++++++++++++++++++ .../decoders/transport/AttachTypeDecoder.java | 20 +-- .../codec/decoders/transport/BeginTypeDecoder.java | 2 +- .../decoders/transport/DetachTypeDecoder.java | 2 +- .../decoders/transport/DispositionTypeDecoder.java | 9 +- .../codec/decoders/transport/FlowTypeDecoder.java | 5 +- .../codec/decoders/transport/OpenTypeDecoder.java | 5 +- .../decoders/transport/TransferTypeDecoder.java | 5 +- .../engine/impl/ProtonFrameEncodingHandler.java | 26 ++-- .../qpid/protonj2/engine/impl/ProtonLink.java | 8 +- .../impl/ProtonFrameEncodingHandlerTest.java | 4 + 11 files changed, 200 insertions(+), 43 deletions(-) diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/PerformativeEncoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/PerformativeEncoder.java new file mode 100644 index 00000000..3030ef75 --- /dev/null +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/PerformativeEncoder.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.qpid.protonj2.codec; + +import org.apache.qpid.protonj2.buffer.ProtonBuffer; +import org.apache.qpid.protonj2.types.transport.Attach; +import org.apache.qpid.protonj2.types.transport.Begin; +import org.apache.qpid.protonj2.types.transport.Close; +import org.apache.qpid.protonj2.types.transport.Detach; +import org.apache.qpid.protonj2.types.transport.Disposition; +import org.apache.qpid.protonj2.types.transport.End; +import org.apache.qpid.protonj2.types.transport.Flow; +import org.apache.qpid.protonj2.types.transport.Open; +import org.apache.qpid.protonj2.types.transport.Performative.PerformativeHandler; +import org.apache.qpid.protonj2.types.transport.Transfer; + +/** + * AMQP Performative type specific encoder that uses any {@link Encoder} to + * cache the specific type encoders for various section types and use them + * directly instead of looking them up via calls to + * {@link Encoder#writeObject(org.apache.qpid.protonj2.buffer.ProtonBuffer, EncoderState, Object)} + */ +public final class PerformativeEncoder implements PerformativeHandler<Encoder> { + + private final TypeEncoder<Attach> attachEncoder; + private final TypeEncoder<Begin> beginEncoder; + private final TypeEncoder<Close> closeEncoder; + private final TypeEncoder<Detach> detachEncoder; + private final TypeEncoder<Disposition> dispositionEncoder; + private final TypeEncoder<End> endEncoder; + private final TypeEncoder<Flow> flowEncoder; + private final TypeEncoder<Open> openEncoder; + private final TypeEncoder<Transfer> transferEncoder; + + private final Encoder encoder; + private final EncoderState encoderState; + + @SuppressWarnings("unchecked") + public PerformativeEncoder(Encoder encoder) { + this.encoder = encoder; + this.encoderState = encoder.newEncoderState(); + + attachEncoder = (TypeEncoder<Attach>) encoder.getTypeEncoder(Attach.class); + beginEncoder = (TypeEncoder<Begin>) encoder.getTypeEncoder(Begin.class); + closeEncoder = (TypeEncoder<Close>) encoder.getTypeEncoder(Close.class); + detachEncoder = (TypeEncoder<Detach>) encoder.getTypeEncoder(Detach.class); + dispositionEncoder = (TypeEncoder<Disposition>) encoder.getTypeEncoder(Disposition.class); + endEncoder = (TypeEncoder<End>) encoder.getTypeEncoder(End.class); + flowEncoder = (TypeEncoder<Flow>) encoder.getTypeEncoder(Flow.class); + openEncoder = (TypeEncoder<Open>) encoder.getTypeEncoder(Open.class); + transferEncoder = (TypeEncoder<Transfer>) encoder.getTypeEncoder(Transfer.class); + } + + public Encoder getEncoder() { + return encoder; + } + + public EncoderState getEncoderState() { + return encoderState; + } + + @Override + public void handleOpen(Open open, ProtonBuffer target, int channel, Encoder encoder) { + try { + openEncoder.writeType(target, encoderState, open); + } finally { + encoderState.reset(); + } + } + + @Override + public void handleBegin(Begin begin, ProtonBuffer target, int channel, Encoder encoder) { + try { + beginEncoder.writeType(target, encoderState, begin); + } finally { + encoderState.reset(); + } + } + + @Override + public void handleAttach(Attach attach, ProtonBuffer target, int channel, Encoder encoder) { + try { + attachEncoder.writeType(target, encoderState, attach); + } finally { + encoderState.reset(); + } + } + + @Override + public void handleFlow(Flow flow, ProtonBuffer target, int channel, Encoder encoder) { + try { + flowEncoder.writeType(target, encoderState, flow); + } finally { + encoderState.reset(); + } + } + + @Override + public void handleTransfer(Transfer transfer, ProtonBuffer target, int channel, Encoder encoder) { + try { + transferEncoder.writeType(target, encoderState, transfer); + } finally { + encoderState.reset(); + } + } + + @Override + public void handleDisposition(Disposition disposition, ProtonBuffer target, int channel, Encoder encoder) { + try { + dispositionEncoder.writeType(target, encoderState, disposition); + } finally { + encoderState.reset(); + } + } + + @Override + public void handleDetach(Detach detach, ProtonBuffer target, int channel, Encoder encoder) { + try { + detachEncoder.writeType(target, encoderState, detach); + } finally { + encoderState.reset(); + } + } + + @Override + public void handleEnd(End end, ProtonBuffer target, int channel, Encoder encoder) { + try { + endEncoder.writeType(target, encoderState, end); + } finally { + encoderState.reset(); + } + } + + @Override + public void handleClose(Close close, ProtonBuffer target, int channel, Encoder encoder) { + try { + closeEncoder.writeType(target, encoderState, close); + } finally { + encoderState.reset(); + } + } +} diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/AttachTypeDecoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/AttachTypeDecoder.java index 13c4228f..80550b9f 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/AttachTypeDecoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/AttachTypeDecoder.java @@ -116,7 +116,7 @@ public final class AttachTypeDecoder extends AbstractDescribedTypeDecoder<Attach throw new DecodeException(errorForMissingRequiredFields(index)); } - buffer.readByte(); + buffer.skipBytes(1); continue; } @@ -128,16 +128,13 @@ public final class AttachTypeDecoder extends AbstractDescribedTypeDecoder<Attach attach.setHandle(decoder.readUnsignedInteger(buffer, state, 0l)); break; case 2: - Boolean role = decoder.readBoolean(buffer, state); - attach.setRole(Boolean.TRUE.equals(role) ? Role.RECEIVER : Role.SENDER); + attach.setRole(decoder.readBoolean(buffer, state, false) ? Role.RECEIVER : Role.SENDER); break; case 3: - byte sndSettleMode = decoder.readUnsignedByte(buffer, state, (byte) 2); - attach.setSenderSettleMode(SenderSettleMode.valueOf(sndSettleMode)); + attach.setSenderSettleMode(SenderSettleMode.valueOf(decoder.readUnsignedByte(buffer, state, (byte) 2))); break; case 4: - byte rcvSettleMode = decoder.readUnsignedByte(buffer, state, (byte) 0); - attach.setReceiverSettleMode(ReceiverSettleMode.valueOf(rcvSettleMode)); + attach.setReceiverSettleMode(ReceiverSettleMode.valueOf(decoder.readUnsignedByte(buffer, state, (byte) 0))); break; case 5: attach.setSource(decoder.readObject(buffer, state, Source.class)); @@ -242,16 +239,13 @@ public final class AttachTypeDecoder extends AbstractDescribedTypeDecoder<Attach attach.setHandle(decoder.readUnsignedInteger(stream, state, 0l)); break; case 2: - Boolean role = decoder.readBoolean(stream, state); - attach.setRole(Boolean.TRUE.equals(role) ? Role.RECEIVER : Role.SENDER); + attach.setRole(decoder.readBoolean(stream, state, false) ? Role.RECEIVER : Role.SENDER); break; case 3: - byte sndSettleMode = decoder.readUnsignedByte(stream, state, (byte) 2); - attach.setSenderSettleMode(SenderSettleMode.valueOf(sndSettleMode)); + attach.setSenderSettleMode(SenderSettleMode.valueOf(decoder.readUnsignedByte(stream, state, (byte) 2))); break; case 4: - byte rcvSettleMode = decoder.readUnsignedByte(stream, state, (byte) 0); - attach.setReceiverSettleMode(ReceiverSettleMode.valueOf(rcvSettleMode)); + attach.setReceiverSettleMode(ReceiverSettleMode.valueOf(decoder.readUnsignedByte(stream, state, (byte) 0))); break; case 5: attach.setSource(decoder.readObject(stream, state, Source.class)); diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/BeginTypeDecoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/BeginTypeDecoder.java index a9206ec2..a936eaa5 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/BeginTypeDecoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/BeginTypeDecoder.java @@ -111,7 +111,7 @@ public final class BeginTypeDecoder extends AbstractDescribedTypeDecoder<Begin> throw new DecodeException(errorForMissingRequiredFields(index)); } - buffer.readByte(); + buffer.skipBytes(1); continue; } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/DetachTypeDecoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/DetachTypeDecoder.java index 4bcb5cb2..a0ca1b99 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/DetachTypeDecoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/DetachTypeDecoder.java @@ -110,7 +110,7 @@ public final class DetachTypeDecoder extends AbstractDescribedTypeDecoder<Detach if (index == 0) { throw new DecodeException("The handle field is mandatory in a Detach"); } - buffer.readByte(); + buffer.skipBytes(1); continue; } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/DispositionTypeDecoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/DispositionTypeDecoder.java index fcad0449..a5467f8e 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/DispositionTypeDecoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/DispositionTypeDecoder.java @@ -106,20 +106,19 @@ public final class DispositionTypeDecoder extends AbstractDescribedTypeDecoder<D // Peek ahead and see if there is a null in the next slot, if so we don't call // the setter for that entry to ensure the returned type reflects the encoded // state in the modification entry. - final boolean nullValue = buffer.getByte(buffer.getReadIndex()) == EncodingCodes.NULL; - if (nullValue) { + if (buffer.getByte(buffer.getReadIndex()) == EncodingCodes.NULL) { // Ensure mandatory fields are set if (index < MIN_DISPOSITION_LIST_ENTRIES) { throw new DecodeException(errorForMissingRequiredFields(index)); } - buffer.readByte(); + buffer.skipBytes(1); continue; } switch (index) { case 0: - disposition.setRole(Boolean.TRUE.equals(decoder.readBoolean(buffer, state)) ? Role.RECEIVER : Role.SENDER); + disposition.setRole(decoder.readBoolean(buffer, state, false) ? Role.RECEIVER : Role.SENDER); break; case 1: disposition.setFirst(decoder.readUnsignedInteger(buffer, state, 0l)); @@ -215,7 +214,7 @@ public final class DispositionTypeDecoder extends AbstractDescribedTypeDecoder<D switch (index) { case 0: - disposition.setRole(Boolean.TRUE.equals(decoder.readBoolean(stream, state)) ? Role.RECEIVER : Role.SENDER); + disposition.setRole(decoder.readBoolean(stream, state, false) ? Role.RECEIVER : Role.SENDER); break; case 1: disposition.setFirst(decoder.readUnsignedInteger(stream, state, 0l)); diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/FlowTypeDecoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/FlowTypeDecoder.java index 7b28fe91..fa39c036 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/FlowTypeDecoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/FlowTypeDecoder.java @@ -105,14 +105,13 @@ public final class FlowTypeDecoder extends AbstractDescribedTypeDecoder<Flow> { // Peek ahead and see if there is a null in the next slot, if so we don't call // the setter for that entry to ensure the returned type reflects the encoded // state in the modification entry. - final boolean nullValue = buffer.getByte(buffer.getReadIndex()) == EncodingCodes.NULL; - if (nullValue) { + if (buffer.getByte(buffer.getReadIndex()) == EncodingCodes.NULL) { // Ensure mandatory fields are set if (index > 0 && index < MIN_FLOW_LIST_ENTRIES) { throw new DecodeException(errorForMissingRequiredFields(index)); } - buffer.readByte(); + buffer.skipBytes(1); continue; } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/OpenTypeDecoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/OpenTypeDecoder.java index d592d13f..1d26ceef 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/OpenTypeDecoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/OpenTypeDecoder.java @@ -104,12 +104,11 @@ public final class OpenTypeDecoder extends AbstractDescribedTypeDecoder<Open> { // Peek ahead and see if there is a null in the next slot, if so we don't call // the setter for that entry to ensure the returned type reflects the encoded // state in the modification entry. - final boolean nullValue = buffer.getByte(buffer.getReadIndex()) == EncodingCodes.NULL; - if (nullValue) { + if (buffer.getByte(buffer.getReadIndex()) == EncodingCodes.NULL) { if (index == 0) { throw new DecodeException("The container-id field cannot be omitted from the Open"); } - buffer.readByte(); + buffer.skipBytes(1); continue; } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/TransferTypeDecoder.java b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/TransferTypeDecoder.java index 4085628a..f48d4aa3 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/TransferTypeDecoder.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/codec/decoders/transport/TransferTypeDecoder.java @@ -108,13 +108,12 @@ public final class TransferTypeDecoder extends AbstractDescribedTypeDecoder<Tran // Peek ahead and see if there is a null in the next slot, if so we don't call // the setter for that entry to ensure the returned type reflects the encoded // state in the modification entry. - final boolean nullValue = buffer.getByte(buffer.getReadIndex()) == EncodingCodes.NULL; - if (nullValue) { + if (buffer.getByte(buffer.getReadIndex()) == EncodingCodes.NULL) { if (index == 0) { throw new DecodeException("The handle field cannot be omitted from the Transfer"); } - buffer.readByte(); + buffer.skipBytes(1); continue; } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameEncodingHandler.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameEncodingHandler.java index 5ecaa2e4..0f65dd31 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameEncodingHandler.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameEncodingHandler.java @@ -22,6 +22,7 @@ import org.apache.qpid.protonj2.codec.CodecFactory; import org.apache.qpid.protonj2.codec.EncodeException; import org.apache.qpid.protonj2.codec.Encoder; import org.apache.qpid.protonj2.codec.EncoderState; +import org.apache.qpid.protonj2.codec.PerformativeEncoder; import org.apache.qpid.protonj2.engine.EngineHandler; import org.apache.qpid.protonj2.engine.EngineHandlerContext; import org.apache.qpid.protonj2.engine.HeaderEnvelope; @@ -51,8 +52,8 @@ public class ProtonFrameEncodingHandler implements EngineHandler { private static final int FRAME_START_BYTE = 0; private static final int FRAME_DOFF_BYTE = 4; - private static final int FRAME_TYPE_BYTE = 5; - private static final int FRAME_CHANNEL_BYTE = 6; + + private static final int FRAME_HEADER_PREFIX = FRAME_DOFF_SIZE << 24 | AMQP_FRAME_TYPE << 15; private static final byte[] SASL_FRAME_HEADER = new byte[] { 0, 0, 0, 0, FRAME_DOFF_SIZE, SASL_FRAME_TYPE, 0, 0 }; @@ -61,8 +62,8 @@ public class ProtonFrameEncodingHandler implements EngineHandler { private final Encoder saslEncoder = CodecFactory.getSaslEncoder(); private final EncoderState saslEncoderState = saslEncoder.newEncoderState(); private final Encoder amqpEncoder = CodecFactory.getEncoder(); - private final EncoderState amqpEncoderState = amqpEncoder.newEncoderState(); + private PerformativeEncoder encoder; private ProtonEngine engine; private ProtonEngineConfiguration configuration; @@ -74,6 +75,11 @@ public class ProtonFrameEncodingHandler implements EngineHandler { ((ProtonEngineHandlerContext) context).interestMask(ProtonEngineHandlerContext.HANDLER_WRITES); } + @Override + public void engineStarting(EngineHandlerContext context) { + encoder = new PerformativeEncoder(amqpEncoder); + } + @Override public void handleWrite(EngineHandlerContext context, HeaderEnvelope envelope) { context.fireWrite(envelope.getBody().getBuffer(), null); @@ -104,12 +110,12 @@ public class ProtonFrameEncodingHandler implements EngineHandler { final int outputBufferSize = Math.min(maxFrameSize, AMQP_PERFORMATIVE_PAD + payload.getReadableBytes()); final ProtonBuffer output = configuration.getBufferAllocator().outputBuffer(outputBufferSize, maxFrameSize); - writePerformative(output, amqpEncoder, amqpEncoderState, envelope.getBody()); + writePerformative(output, encoder, envelope.getChannel(), envelope.getBody()); if (payload.getReadableBytes() > output.getMaxWritableBytes()) { envelope.handlePayloadToLarge(); - writePerformative(output, amqpEncoder, amqpEncoderState, envelope.getBody()); + writePerformative(output, encoder, envelope.getChannel(), envelope.getBody()); output.writeBytes(payload, output.getMaxWritableBytes()); } else { @@ -118,22 +124,18 @@ public class ProtonFrameEncodingHandler implements EngineHandler { // Now fill in the frame header with the specified information output.setInt(FRAME_START_BYTE, output.getReadableBytes()); - output.setByte(FRAME_DOFF_BYTE, FRAME_DOFF_SIZE); - output.setByte(FRAME_TYPE_BYTE, AMQP_FRAME_TYPE); - output.setShort(FRAME_CHANNEL_BYTE, (short) envelope.getChannel()); + output.setInt(FRAME_DOFF_BYTE, FRAME_HEADER_PREFIX | envelope.getChannel()); context.fireWrite(output, envelope::handleOutgoingFrameWriteComplete); } - private static void writePerformative(ProtonBuffer target, Encoder encoder, EncoderState state, Performative performative) { + private static void writePerformative(ProtonBuffer target, PerformativeEncoder encoder, int channel, Performative performative) { target.setWriteIndex(FRAME_HEADER_SIZE); try { - encoder.writeObject(target, state, performative); + performative.invoke(encoder, target, channel, encoder.getEncoder()); } catch (EncodeException ex) { throw new FrameEncodingException(ex); - } finally { - state.reset(); } } } diff --git a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonLink.java b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonLink.java index 628282c2..3cc7a8d2 100644 --- a/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonLink.java +++ b/protonj2/src/main/java/org/apache/qpid/protonj2/engine/impl/ProtonLink.java @@ -692,12 +692,16 @@ public abstract class ProtonLink<L extends Link<L>> extends ProtonEndpoint<L> im } final L remoteDisposition(Disposition disposition, ProtonOutgoingDelivery delivery) { - LOG.trace("Link:{} Received remote disposition:{} for sent delivery:{}", self(), disposition, delivery); + if (LOG.isTraceEnabled()) { + LOG.trace("Link:{} Received remote disposition:{} for sent delivery:{}", self(), disposition, delivery); + } return handleRemoteDisposition(disposition, delivery); } final L remoteDisposition(Disposition disposition, ProtonIncomingDelivery delivery) { - LOG.trace("Link:{} Received remote disposition:{} for received delivery:{}", self(), disposition, delivery); + if (LOG.isTraceEnabled()) { + LOG.trace("Link:{} Received remote disposition:{} for received delivery:{}", self(), disposition, delivery); + } return handleRemoteDisposition(disposition, delivery); } diff --git a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameEncodingHandlerTest.java b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameEncodingHandlerTest.java index fb53d334..f8dfcd34 100644 --- a/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameEncodingHandlerTest.java +++ b/protonj2/src/test/java/org/apache/qpid/protonj2/engine/impl/ProtonFrameEncodingHandlerTest.java @@ -74,6 +74,7 @@ class ProtonFrameEncodingHandlerTest { void testEncodeBasicTransfer() { ProtonFrameEncodingHandler handler = new ProtonFrameEncodingHandler(); handler.handlerAdded(context); + handler.engineStarting(context); Transfer transfer = new Transfer(); transfer.setHandle(0); @@ -110,6 +111,7 @@ class ProtonFrameEncodingHandlerTest { void testEncodeBasicTransferWthPayloadThatFitsIntoFrame() { ProtonFrameEncodingHandler handler = new ProtonFrameEncodingHandler(); handler.handlerAdded(context); + handler.engineStarting(context); Transfer transfer = new Transfer(); transfer.setHandle(0); @@ -150,6 +152,7 @@ class ProtonFrameEncodingHandlerTest { void testEncodeBasicTransferWthPayloadThatDoesNotFitIntoFrame() { ProtonFrameEncodingHandler handler = new ProtonFrameEncodingHandler(); handler.handlerAdded(context); + handler.engineStarting(context); Transfer transfer = new Transfer(); transfer.setHandle(0); @@ -197,6 +200,7 @@ class ProtonFrameEncodingHandlerTest { void testOutgoingFrameIsReleasedAfterWriteFinishes() { ProtonFrameEncodingHandler handler = new ProtonFrameEncodingHandler(); handler.handlerAdded(context); + handler.engineStarting(context); Transfer transfer = new Transfer(); transfer.setHandle(0); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org