This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d37c0a6ccd [INLONG-9380][Sort] Audit lost when stop job immediately
after checkpoint (#9396)
d37c0a6ccd is described below
commit d37c0a6ccd465aba38e2612faa9ee99606493d11
Author: vernedeng <[email protected]>
AuthorDate: Mon Dec 4 14:20:44 2023 +0800
[INLONG-9380][Sort] Audit lost when stop job immediately after checkpoint
(#9396)
---
.../inlong/sort/base/metric/SourceMetricData.java | 20 ++++++++++----------
.../sort/iceberg/sink/IcebergStreamWriter.java | 6 ++++++
.../iceberg/sink/IcebergStreamWriterMetrics.java | 6 ++++++
.../iceberg/source/reader/IcebergSourceReader.java | 8 ++++++++
.../reader/InlongIcebergSourceReaderMetrics.java | 6 ++++++
.../inlong/sort/tubemq/FlinkTubeMQConsumer.java | 2 ++
.../table/DynamicTubeMQDeserializationSchema.java | 2 ++
.../DynamicTubeMQTableDeserializationSchema.java | 7 +++++++
8 files changed, 47 insertions(+), 10 deletions(-)
diff --git
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
index 1e1a624762..91abcf22aa 100644
---
a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
+++
b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/SourceMetricData.java
@@ -298,16 +298,6 @@ public class SourceMetricData implements MetricData,
Serializable {
}
}
- /**
- * flush audit data
- * usually call this method in close method or when checkpointing
- */
- public void flushAuditData() {
- if (auditOperator != null) {
- auditOperator.send();
- }
- }
-
public void outputMetrics(long rowCountSize, long rowDataSize, long
dataTime) {
outputDefaultMetrics(rowCountSize, rowDataSize);
if (auditOperator != null) {
@@ -345,6 +335,16 @@ public class SourceMetricData implements MetricData,
Serializable {
}
}
+ /**
+ * flush audit data
+ * usually call this method in close method or when checkpointing
+ */
+ public void flushAuditData() {
+ if (auditOperator != null) {
+ auditOperator.send();
+ }
+ }
+
private void outputDefaultMetrics(long rowCountSize, long rowDataSize,
long fetchDelay, long emitDelay) {
outputDefaultMetrics(rowCountSize, rowDataSize);
this.fetchDelay = fetchDelay;
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
index 0cf31c206e..b318306380 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriter.java
@@ -20,6 +20,7 @@ package org.apache.inlong.sort.iceberg.sink;
import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.iceberg.utils.SinkMetadataUtils;
+import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -78,6 +79,11 @@ class IcebergStreamWriter<T> extends
AbstractStreamOperator<WriteResult>
this.writer = taskWriterFactory.create();
}
+ @Override
+ public void snapshotState(StateSnapshotContext context) {
+ writerMetrics.flushAudit();
+ }
+
@Override
public void processElement(StreamRecord<T> element) throws Exception {
T data = element.getValue();
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
index 1d627714bc..72ca7e0cf5 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/IcebergStreamWriterMetrics.java
@@ -113,4 +113,10 @@ class IcebergStreamWriterMetrics {
sourceMetricData.outputMetrics(1, size, time);
}
}
+
+ void flushAudit() {
+ if (sourceMetricData != null) {
+ sourceMetricData.flushAuditData();
+ }
+ }
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
index ad3a9b13d4..df75723ceb 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/IcebergSourceReader.java
@@ -28,6 +28,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
/**
@@ -38,6 +39,7 @@ public class IcebergSourceReader<T>
extends
SingleThreadMultiplexSourceReaderBase<RecordAndPosition<T>, T,
IcebergSourceSplit, IcebergSourceSplit> {
+ private final InlongIcebergSourceReaderMetrics<T> metrics;
public IcebergSourceReader(
InlongIcebergSourceReaderMetrics<T> metrics,
ReaderFunction<T> readerFunction,
@@ -47,6 +49,7 @@ public class IcebergSourceReader<T>
new IcebergSourceRecordEmitter<>(),
context.getConfiguration(),
context);
+ this.metrics = metrics;
}
@Override
@@ -62,6 +65,11 @@ public class IcebergSourceReader<T>
protected void onSplitFinished(Map<String, IcebergSourceSplit>
finishedSplitIds) {
requestSplit(Lists.newArrayList(finishedSplitIds.keySet()));
}
+ @Override
+ public List<IcebergSourceSplit> snapshotState(long checkpointId) {
+ metrics.flushAudit();
+ return super.snapshotState(checkpointId);
+ }
@Override
protected IcebergSourceSplit initializedState(IcebergSourceSplit split) {
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
index 252ae4580d..2210fbca02 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/source/reader/InlongIcebergSourceReaderMetrics.java
@@ -77,4 +77,10 @@ public class InlongIcebergSourceReaderMetrics<T> extends
IcebergSourceReaderMetr
}
return object.toString().getBytes(StandardCharsets.UTF_8).length;
}
+
+ void flushAudit() {
+ if (sourceMetricData != null) {
+ sourceMetricData.flushAuditData();
+ }
+ }
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
index b9fb6d1b0d..1f261cfef5 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/FlinkTubeMQConsumer.java
@@ -317,6 +317,8 @@ public class FlinkTubeMQConsumer<T> extends
RichParallelSourceFunction<T>
offsetsState.add(new Tuple2<>(entry.getKey(), entry.getValue()));
}
+ deserializationSchema.flushAudit();
+
LOG.info("Successfully save the offsets in checkpoint {}: {}.",
context.getCheckpointId(), currentOffsets);
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
index 4c4eaac841..c6ec9ea9cb 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQDeserializationSchema.java
@@ -58,4 +58,6 @@ public interface DynamicTubeMQDeserializationSchema<T>
extends Serializable, Res
out.collect(deserialize);
}
}
+
+ void flushAudit();
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
index 8ee154c535..3f2a57d7c7 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/DynamicTubeMQTableDeserializationSchema.java
@@ -109,6 +109,13 @@ public class DynamicTubeMQTableDeserializationSchema
implements DynamicTubeMQDes
}
+ @Override
+ public void flushAudit() {
+ if (sourceMetricData != null) {
+ sourceMetricData.flushAuditData();
+ }
+ }
+
@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;