Copilot commented on code in PR #6055:
URL: https://github.com/apache/shenyu/pull/6055#discussion_r2212681534
##########
shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java:
##########
@@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final
Publisher<? extends Data
byte[] inBytes = new byte[ro.remaining()];
ro.get(inBytes);
+ byte[] processedBytes;
if (isGzip) {
int offset = 0;
- int len = inBytes.length;
- if (!headerSkipped.get()) {
+ if (!headerSkipped.getAndSet(true)) {
offset = skipGzipHeader(inBytes);
- headerSkipped.set(true);
}
- inflater.setInput(inBytes, offset, len -
offset);
+ inflater.setInput(inBytes, offset,
inBytes.length - offset);
+ ByteArrayOutputStream baos = new
ByteArrayOutputStream();
try {
int cnt;
while ((cnt =
inflater.inflate(outBuf)) > 0) {
-
writer.write(ByteBuffer.wrap(outBuf, 0, cnt));
+ baos.write(outBuf, 0, cnt);
}
} catch (DataFormatException ex) {
- LOG.error("inflater decompression
failed", ex);
+ LOG.error("Inflater decompression
failed", ex);
}
+ processedBytes = baos.toByteArray();
} else {
- writer.write(ro);
+ processedBytes = inBytes;
+ }
+ String chunk = new String(processedBytes,
StandardCharsets.UTF_8);
+ for (String line : chunk.split("\\r?\\n")) {
+ if (!line.startsWith("data:")) {
+ continue;
+ }
+ String payload =
line.substring("data:".length()).trim();
+ if (payload.isEmpty() ||
"[DONE]".equals(payload)) {
+ continue;
+ }
+ if (!payload.startsWith("{")) {
+ continue;
+ }
+ try {
+ JsonNode node =
MAPPER.readTree(payload);
+ JsonNode usage =
node.get(Constants.USAGE);
+ if (Objects.nonNull(usage) &&
usage.has(Constants.COMPLETION_TOKENS)) {
+ long c =
usage.get(Constants.COMPLETION_TOKENS).asLong();
+ tokensRecorder.accept(c);
+ streamingUsageRecorded.set(true);
+ }
+ } catch (Exception e) {
+ LOG.error("parse ai resp error", e);
+ }
}
+ writer.write(ByteBuffer.wrap(processedBytes));
});
} catch (Exception e) {
LOG.error("read dataBuffer error", e);
}
})
.doFinally(signal -> {
- // release inflater
if (Objects.nonNull(inflater)) {
inflater.end();
}
- String responseBody = writer.output();
- AiModel aiModel =
exchange.getAttribute(Constants.AI_MODEL);
- long tokens =
Objects.requireNonNull(aiModel).getCompletionTokens(responseBody);
- tokensRecorder.accept(tokens);
+ if (!streamingUsageRecorded.get()) {
+ String sse = writer.output();
+ long usageTokens = extractUsageTokensFromSse(sse);
+ tokensRecorder.accept(usageTokens);
+ }
});
}
+ private long extractUsageTokensFromSse(final String sse) {
+ Pattern p =
Pattern.compile("\"completion_tokens\"\\s*:\\s*(\\d+)");
+ Matcher m = p.matcher(sse);
Review Comment:
The regex pattern is compiled on every method call. Consider making the
Pattern a static final field to improve performance.
```suggestion
private static final Pattern COMPLETION_TOKENS_PATTERN =
Pattern.compile("\"completion_tokens\"\\s*:\\s*(\\d+)");
private long extractUsageTokensFromSse(final String sse) {
Matcher m = COMPLETION_TOKENS_PATTERN.matcher(sse);
```
##########
shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java:
##########
@@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final
Publisher<? extends Data
byte[] inBytes = new byte[ro.remaining()];
ro.get(inBytes);
+ byte[] processedBytes;
if (isGzip) {
int offset = 0;
- int len = inBytes.length;
- if (!headerSkipped.get()) {
+ if (!headerSkipped.getAndSet(true)) {
offset = skipGzipHeader(inBytes);
- headerSkipped.set(true);
}
- inflater.setInput(inBytes, offset, len -
offset);
+ inflater.setInput(inBytes, offset,
inBytes.length - offset);
+ ByteArrayOutputStream baos = new
ByteArrayOutputStream();
try {
int cnt;
while ((cnt =
inflater.inflate(outBuf)) > 0) {
-
writer.write(ByteBuffer.wrap(outBuf, 0, cnt));
+ baos.write(outBuf, 0, cnt);
}
} catch (DataFormatException ex) {
- LOG.error("inflater decompression
failed", ex);
+ LOG.error("Inflater decompression
failed", ex);
}
+ processedBytes = baos.toByteArray();
} else {
- writer.write(ro);
+ processedBytes = inBytes;
+ }
+ String chunk = new String(processedBytes,
StandardCharsets.UTF_8);
+ for (String line : chunk.split("\\r?\\n")) {
+ if (!line.startsWith("data:")) {
+ continue;
+ }
+ String payload =
line.substring("data:".length()).trim();
+ if (payload.isEmpty() ||
"[DONE]".equals(payload)) {
+ continue;
+ }
+ if (!payload.startsWith("{")) {
+ continue;
+ }
+ try {
+ JsonNode node =
MAPPER.readTree(payload);
+ JsonNode usage =
node.get(Constants.USAGE);
+ if (Objects.nonNull(usage) &&
usage.has(Constants.COMPLETION_TOKENS)) {
+ long c =
usage.get(Constants.COMPLETION_TOKENS).asLong();
+ tokensRecorder.accept(c);
+ streamingUsageRecorded.set(true);
+ }
+ } catch (Exception e) {
+ LOG.error("parse ai resp error", e);
+ }
}
+ writer.write(ByteBuffer.wrap(processedBytes));
});
} catch (Exception e) {
LOG.error("read dataBuffer error", e);
Review Comment:
Catching generic Exception is too broad. Consider catching specific
exceptions like JsonProcessingException or IOException to handle different
error scenarios appropriately.
```suggestion
} catch (JsonProcessingException e) {
LOG.error("JSON processing error
while parsing AI response", e);
} catch (IOException e) {
LOG.error("IO error while parsing AI
response", e);
}
}
writer.write(ByteBuffer.wrap(processedBytes));
});
} catch (IOException e) {
LOG.error("IO error while reading dataBuffer",
e);
```
##########
shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java:
##########
@@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final
Publisher<? extends Data
byte[] inBytes = new byte[ro.remaining()];
ro.get(inBytes);
+ byte[] processedBytes;
if (isGzip) {
int offset = 0;
- int len = inBytes.length;
- if (!headerSkipped.get()) {
+ if (!headerSkipped.getAndSet(true)) {
offset = skipGzipHeader(inBytes);
- headerSkipped.set(true);
}
- inflater.setInput(inBytes, offset, len -
offset);
+ inflater.setInput(inBytes, offset,
inBytes.length - offset);
+ ByteArrayOutputStream baos = new
ByteArrayOutputStream();
try {
int cnt;
while ((cnt =
inflater.inflate(outBuf)) > 0) {
-
writer.write(ByteBuffer.wrap(outBuf, 0, cnt));
+ baos.write(outBuf, 0, cnt);
}
} catch (DataFormatException ex) {
- LOG.error("inflater decompression
failed", ex);
+ LOG.error("Inflater decompression
failed", ex);
}
+ processedBytes = baos.toByteArray();
} else {
- writer.write(ro);
+ processedBytes = inBytes;
+ }
+ String chunk = new String(processedBytes,
StandardCharsets.UTF_8);
+ for (String line : chunk.split("\\r?\\n")) {
+ if (!line.startsWith("data:")) {
+ continue;
+ }
+ String payload =
line.substring("data:".length()).trim();
+ if (payload.isEmpty() ||
"[DONE]".equals(payload)) {
+ continue;
+ }
+ if (!payload.startsWith("{")) {
+ continue;
+ }
+ try {
+ JsonNode node =
MAPPER.readTree(payload);
+ JsonNode usage =
node.get(Constants.USAGE);
+ if (Objects.nonNull(usage) &&
usage.has(Constants.COMPLETION_TOKENS)) {
+ long c =
usage.get(Constants.COMPLETION_TOKENS).asLong();
+ tokensRecorder.accept(c);
+ streamingUsageRecorded.set(true);
+ }
+ } catch (Exception e) {
+ LOG.error("parse ai resp error", e);
Review Comment:
The error message 'parse ai resp error' is unclear and uses abbreviations.
Consider a more descriptive message like 'Failed to parse AI response JSON
payload'.
```suggestion
LOG.error("Failed to parse AI
response JSON payload", e);
```
##########
shenyu-plugin/shenyu-plugin-ai/shenyu-plugin-ai-token-limiter/src/main/java/org/apache/shenyu/plugin/ai/token/limiter/AiTokenLimiterPlugin.java:
##########
@@ -228,42 +242,78 @@ private Flux<? extends DataBuffer> appendResponse(final
Publisher<? extends Data
byte[] inBytes = new byte[ro.remaining()];
ro.get(inBytes);
+ byte[] processedBytes;
if (isGzip) {
int offset = 0;
- int len = inBytes.length;
- if (!headerSkipped.get()) {
+ if (!headerSkipped.getAndSet(true)) {
Review Comment:
Using getAndSet(true) in a conditional check can lead to race conditions.
Consider using a more explicit atomic operation or synchronization mechanism to
ensure thread safety.
```suggestion
if (headerSkipped.compareAndSet(false,
true)) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]