This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c2c2c3069 [INLONG-6005][DataProxy] Fix resend read values (#6006)
c2c2c3069 is described below
commit c2c2c3069d325c4cf9040c9a96452757d6a0b93b
Author: Lucas <[email protected]>
AuthorDate: Fri Sep 23 18:43:51 2022 +0800
[INLONG-6005][DataProxy] Fix resend read values (#6006)
---
.../org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java | 2 ++
.../src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java | 6 ------
.../src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java | 4 ----
3 files changed, 2 insertions(+), 10 deletions(-)
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
index e420e03a2..cb52823bb 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/metrics/DataProxyMetricItemSet.java
@@ -161,6 +161,8 @@ public class DataProxyMetricItemSet extends
MetricItemSet<DataProxyMetricItem> {
metricItem.sendFailCount.incrementAndGet();
metricItem.sendFailSize.addAndGet(event.getBody().length);
}
+ metricItem.sendCount.incrementAndGet();
+ metricItem.sendSize.addAndGet(event.getBody().length);
}
}
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
index 3785ae993..cac638ce5 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/PulsarSink.java
@@ -402,14 +402,8 @@ public class PulsarSink extends AbstractSink implements
Configurable, SendMessag
+ "--> pulsar,Check if pulsar server or network is
ok.(if this situation "
+ "last long time it will cause memoryChannel full
and fileChannel write.)", getName());
tx.rollback();
- // metric
- this.metricItemSet.fillSinkReadMetricItemsByEvent(
- event, false, event.getBody().length);
} else {
tx.commit();
- // metric
- this.metricItemSet.fillSinkReadMetricItemsByEvent(
- event, true, event.getBody().length);
}
} else {
status = Status.BACKOFF;
diff --git
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
index 86787136a..feb7fc390 100644
---
a/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
+++
b/inlong-dataproxy/dataproxy-source/src/main/java/org/apache/inlong/dataproxy/sink/TubeSink.java
@@ -273,12 +273,8 @@ public class TubeSink extends AbstractSink implements
Configurable {
if (eventQueue.offer(event, 3 * 1000, TimeUnit.MILLISECONDS)) {
tx.commit();
cachedMsgCnt.incrementAndGet();
- this.metricItemSet.fillSinkReadMetricItemsByEvent(
- event, true, event.getBody().length);
} else {
tx.rollback();
- this.metricItemSet.fillSinkReadMetricItemsByEvent(
- event, false, event.getBody().length);
}
} else {
status = Status.BACKOFF;