Vladsz83 commented on code in PR #12531:
URL: https://github.com/apache/ignite/pull/12531#discussion_r2580726380
##########
modules/core/src/main/java/org/apache/ignite/internal/IgniteDiagnosticMessage.java:
##########
@@ -56,462 +43,234 @@
*/
public class IgniteDiagnosticMessage implements Message {
/** */
- private static final int REQUEST_FLAG_MASK = 0x01;
-
- /** */
- private byte flags;
+ @Order(value = 0, method = "infoResponse")
+ private @Nullable String infoResp;
/** */
+ @Order(value = 1, method = "futureId")
private long futId;
- /** */
- private byte[] bytes;
+ /** Originator node id. */
+ @Order(2)
+ private UUID nodeId;
+
+ /** Infos to send to a remote node. */
+ @Order(3)
+ private final Set<DiagnosticBaseInfo> infos = new LinkedHashSet<>();
+
+ /** Local message related to remote info. */
+ private final Map<Object, List<String>> msgs = new LinkedHashMap<>();
/**
- * Required by {@link GridIoMessageFactory}.
+ * Default constructor required by {@link GridIoMessageFactory}.
*/
public IgniteDiagnosticMessage() {
// No-op.
}
/**
- * @param marsh Marshaller.
- * @param info Compound info.
- * @param futId Future ID.
- * @return Request message.
- * @throws IgniteCheckedException If failed.
+ * Creates a diagnostic info holder.
+ *
+ * @param nodeId Originator node ID.
*/
- public static IgniteDiagnosticMessage createRequest(Marshaller marsh,
- CompoundInfo info,
- long futId
- ) throws IgniteCheckedException {
- byte[] cBytes = U.marshal(marsh, info);
-
- IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage();
-
- msg.futId = futId;
- msg.bytes = cBytes;
- msg.flags |= REQUEST_FLAG_MASK;
-
- return msg;
+ IgniteDiagnosticMessage(UUID nodeId) {
+ this.nodeId = nodeId;
}
/**
- * @param resBytes Marshalled result.
+ * Creates a diagnostic request.
+ *
* @param futId Future ID.
- * @return Response message.
- */
- public static IgniteDiagnosticMessage createResponse(byte[] resBytes, long
futId) {
- IgniteDiagnosticMessage msg = new IgniteDiagnosticMessage();
-
- msg.futId = futId;
- msg.bytes = resBytes;
-
- return msg;
- }
-
- /**
- * @param marsh Marshaller.
- * @return Unmarshalled payload.
- * @throws IgniteCheckedException If failed.
+ * @param nodeId Node ID.
+ * @param infos Diagnostic infos.
*/
- @Nullable public <T> T unmarshal(Marshaller marsh)
- throws IgniteCheckedException {
- if (bytes == null)
- return null;
+ public IgniteDiagnosticMessage(long futId, UUID nodeId,
Collection<DiagnosticBaseInfo> infos) {
+ this(nodeId);
- return U.unmarshal(marsh, bytes, null);
+ this.futId = futId;
+ this.infos.addAll(infos);
}
/**
- * @return Future ID.
+ * Creates a diagnostic response.
+ *
+ * @param resp Diagnostic info result.
+ * @param futId Future ID.
*/
- public long futureId() {
- return futId;
+ public IgniteDiagnosticMessage(String resp, long futId) {
+ this.futId = futId;
+ infoResp = resp;
}
/**
- * @return {@code True} if this is request message.
+ * @param ctx Grid context.
+ * @return Diagnostic info.
*/
- public boolean request() {
- return (flags & REQUEST_FLAG_MASK) != 0;
- }
-
- /** {@inheritDoc} */
- @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
- writer.setBuffer(buf);
-
- if (!writer.isHeaderWritten()) {
- if (!writer.writeHeader(directType()))
- return false;
-
- writer.onHeaderWritten();
- }
-
- switch (writer.state()) {
- case 0:
- if (!writer.writeByteArray(bytes))
- return false;
-
- writer.incrementState();
-
- case 1:
- if (!writer.writeByte(flags))
- return false;
-
- writer.incrementState();
-
- case 2:
- if (!writer.writeLong(futId))
- return false;
-
- writer.incrementState();
+ public String diagnosticInfo(GridKernalContext ctx) {
+ try {
+ IgniteInternalFuture<String> commInfo = dumpCommunicationInfo(ctx,
nodeId);
- }
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
- reader.setBuffer(buf);
-
- switch (reader.state()) {
- case 0:
- bytes = reader.readByteArray();
+ StringBuilder sb = new StringBuilder();
- if (!reader.isLastRead())
- return false;
+ dumpNodeBasicInfo(sb, ctx);
- reader.incrementState();
-
- case 1:
- flags = reader.readByte();
+ sb.append(U.nl());
- if (!reader.isLastRead())
- return false;
+ dumpExchangeInfo(sb, ctx);
- reader.incrementState();
+ sb.append(U.nl());
- case 2:
- futId = reader.readLong();
+ ctx.cache().context().io().dumpPendingMessages(sb);
- if (!reader.isLastRead())
- return false;
+ sb.append(commInfo.get(10_000));
- reader.incrementState();
+ moreInfo(sb, ctx);
+ return sb.toString();
}
+ catch (Exception e) {
+ ctx.cluster().diagnosticLog().error("Failed to execute diagnostic
message closure: " + e, e);
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public short directType() {
- return -61;
- }
-
- /**
- *
- */
- public abstract static class DiagnosticBaseInfo {
- /**
- * @param other Another info of the same type.
- */
- public void merge(DiagnosticBaseInfo other) {
- // No-op.
+ return "Failed to execute diagnostic message closure: " + e;
}
-
- /**
- * @param sb String builder.
- * @param ctx Grid context.
- */
- public abstract void appendInfo(StringBuilder sb, GridKernalContext
ctx);
}
/**
- *
+ * @param sb String builder.
+ * @param ctx Grid context.
*/
- public static final class TxEntriesInfo extends DiagnosticBaseInfo
implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private int cacheId;
-
- /** */
- private Collection<KeyCacheObject> keys;
-
- /** Empty constructor required by {@link Externalizable}. */
- public TxEntriesInfo() {
- // No-op.
- }
-
- /**
- * @param cacheId Cache ID.
- * @param keys Keys.
- */
- TxEntriesInfo(int cacheId, Collection<KeyCacheObject> keys) {
- this.cacheId = cacheId;
- this.keys = new HashSet<>(keys);
- }
-
- /** {@inheritDoc} */
- @Override public void appendInfo(StringBuilder sb, GridKernalContext
ctx) {
- sb.append(U.nl());
-
- GridCacheContext<?, ?> cctx =
ctx.cache().context().cacheContext(cacheId);
-
- if (cctx == null) {
- sb.append("Failed to find cache with id: ").append(cacheId);
-
- return;
- }
-
+ private void moreInfo(StringBuilder sb, GridKernalContext ctx) {
+ for (IgniteDiagnosticMessage.DiagnosticBaseInfo baseInfo : infos) {
try {
- for (KeyCacheObject key : keys)
- key.finishUnmarshal(cctx.cacheObjectContext(), null);
+ baseInfo.appendInfo(sb, ctx);
}
- catch (IgniteCheckedException e) {
- ctx.cluster().diagnosticLog().error("Failed to unmarshal key:
" + e, e);
+ catch (Exception e) {
+ ctx.cluster().diagnosticLog().error(
+ "Failed to populate diagnostic with additional
information: " + e, e);
- sb.append("Failed to unmarshal key:
").append(e).append(U.nl());
+ sb.append(U.nl()).append("Failed to populate diagnostic with
additional information: ").append(e);
}
-
- sb.append("Cache entries [cacheId=").append(cacheId)
- .append(", cacheName=").append(cctx.name()).append("]: ");
-
- for (KeyCacheObject key : keys) {
- GridCacheMapEntry e =
(GridCacheMapEntry)cctx.cache().peekEx(key);
-
- sb.append(U.nl()).append(" Key
[key=").append(key).append(", entry=").append(e).append("]");
- }
- }
-
- /** {@inheritDoc} */
- @Override public void merge(DiagnosticBaseInfo other) {
- TxEntriesInfo other0 = (TxEntriesInfo)other;
-
- assert other0 != null && cacheId == other0.cacheId : other;
-
- this.keys.addAll(other0.keys);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws
IOException {
- out.writeInt(cacheId);
- U.writeCollection(out, keys);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
- cacheId = in.readInt();
- keys = U.readCollection(in);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- TxEntriesInfo that = (TxEntriesInfo)o;
-
- return cacheId == that.cacheId;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return Objects.hash(getClass(), cacheId);
}
}
/**
- *
+ * @return Initial message.
*/
- public static final class ExchangeInfo extends DiagnosticBaseInfo
implements Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private AffinityTopologyVersion topVer;
+ public String message() {
+ StringBuilder sb = new StringBuilder();
- /** Empty constructor required by {@link Externalizable}. */
- public ExchangeInfo() {
- // No-op.
- }
+ for (List<String> msgs0 : msgs.values()) {
+ for (String msg : msgs0) {
+ if (sb.length() > 0)
+ sb.append('\n');
- /**
- * @param topVer Exchange version.
- */
- ExchangeInfo(AffinityTopologyVersion topVer) {
- this.topVer = topVer;
+ sb.append(msg);
+ }
}
- /** {@inheritDoc} */
- @Override public void appendInfo(StringBuilder sb, GridKernalContext
ctx) {
- sb.append(U.nl());
+ return sb.toString();
+ }
- List<GridDhtPartitionsExchangeFuture> futs =
ctx.cache().context().exchange().exchangeFutures();
+ /**
+ * @param msg Message.
+ * @param baseInfo Info or {@code null} if only basic info is needed.
+ */
+ void add(String msg, @Nullable IgniteDiagnosticMessage.DiagnosticBaseInfo
baseInfo) {
+ Object key = baseInfo != null ? baseInfo : getClass();
- for (GridDhtPartitionsExchangeFuture fut : futs) {
- if (topVer.equals(fut.initialVersion())) {
- sb.append("Exchange future: ").append(fut);
+ msgs.computeIfAbsent(key, k -> new ArrayList<>()).add(msg);
- return;
+ if (baseInfo != null) {
+ if (!infos.add(baseInfo) && baseInfo instanceof TxEntriesInfo) {
+ for (IgniteDiagnosticMessage.DiagnosticBaseInfo baseInfo0 :
infos) {
+ if (baseInfo0.equals(baseInfo))
+ baseInfo0.merge(baseInfo);
}
}
-
- sb.append("Failed to find exchange future: ").append(topVer);
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws
IOException {
- out.writeObject(topVer);
}
+ }
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
- topVer = (AffinityTopologyVersion)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- ExchangeInfo that = (ExchangeInfo)o;
-
- return Objects.equals(topVer, that.topVer);
- }
+ /** */
+ public UUID nodeId() {
+ return nodeId;
+ }
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return Objects.hash(getClass(), topVer);
- }
+ /** */
+ public void nodeId(UUID nodeId) {
+ this.nodeId = nodeId;
}
/**
- *
+ * @return Future ID.
*/
- public static final class TxInfo extends DiagnosticBaseInfo implements
Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private GridCacheVersion dhtVer;
-
- /** */
- private GridCacheVersion nearVer;
-
- /** Empty constructor required by {@link Externalizable}. */
- public TxInfo() {
- // No-op.
- }
-
- /**
- * @param dhtVer Tx dht version.
- * @param nearVer Tx near version.
- */
- TxInfo(GridCacheVersion dhtVer, GridCacheVersion nearVer) {
- this.dhtVer = dhtVer;
- this.nearVer = nearVer;
- }
+ public long futureId() {
+ return futId;
+ }
- /** {@inheritDoc} */
- @Override public void appendInfo(StringBuilder sb, GridKernalContext
ctx) {
- sb.append(U.nl())
- .append("Related transactions [dhtVer=").append(dhtVer)
- .append(", nearVer=").append(nearVer).append("]: ");
-
- boolean found = false;
-
- for (IgniteInternalTx tx :
ctx.cache().context().tm().activeTransactions()) {
- if (dhtVer.equals(tx.xidVersion()) ||
nearVer.equals(tx.nearXidVersion())) {
- sb.append(U.nl())
- .append(" ")
- .append(tx.getClass().getSimpleName())
- .append(" [ver=").append(tx.xidVersion())
- .append(", nearVer=").append(tx.nearXidVersion())
- .append(", topVer=").append(tx.topologyVersion())
- .append(", state=").append(tx.state())
- .append(", fullTx=").append(tx).append(']');
-
- found = true;
- }
- }
+ /** */
+ public void futureId(long futId) {
+ this.futId = futId;
+ }
- if (!found)
- sb.append(U.nl()).append("Failed to find related
transactions.");
- }
+ /**
+ * @return {@code True} if this is request message.
+ */
+ public boolean request() {
+ return infoResp == null;
+ }
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws
IOException {
- out.writeObject(dhtVer);
- out.writeObject(nearVer);
- }
+ /** @return Compound diagnostic infos. */
+ public Collection<DiagnosticBaseInfo> infos() {
+ return Collections.unmodifiableCollection(infos);
Review Comment:
There is add() method. It is not internally immutable
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]