This is an automated email from the ASF dual-hosted git repository. healchow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 7617fa00748b5e9d8fa1d6a7998885f461e91b60 Author: thesumery <[email protected]> AuthorDate: Fri Nov 4 16:34:50 2022 +0800 [INLONG-6379][Sort] Iceberg misses metric data in multiple sink scenes (#6381) --- .../apache/inlong/sort/iceberg/sink/FlinkSink.java | 3 +- .../sink/multiple/IcebergMultipleStreamWriter.java | 59 +++++++++++++++++++++- 2 files changed, 60 insertions(+), 2 deletions(-) diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java index bb7498650..25f9e963b 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/FlinkSink.java @@ -528,7 +528,8 @@ public class FlinkSink { .setParallelism(parallelism); IcebergProcessOperator streamWriter = - new IcebergProcessOperator(new IcebergMultipleStreamWriter(appendMode, catalogLoader)); + new IcebergProcessOperator(new IcebergMultipleStreamWriter( + appendMode, catalogLoader, inlongMetric, auditHostAndPorts)); SingleOutputStreamOperator<MultipleWriteResult> writerStream = routeStream .transform(operatorName(ICEBERG_MULTIPLE_STREAM_WRITER_NAME), TypeInformation.of(IcebergProcessOperator.class), diff --git a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java index 4c3fb0045..617eb6d69 100644 --- a/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java +++ b/inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/multiple/IcebergMultipleStreamWriter.java @@ -18,6 +18,10 @@ package org.apache.inlong.sort.iceberg.sink.multiple; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; @@ -34,10 +38,16 @@ import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.flink.sink.TaskWriterFactory; import org.apache.iceberg.types.Types.NestedField; import org.apache.iceberg.util.PropertyUtil; +import org.apache.inlong.sort.base.metric.MetricOption; +import org.apache.inlong.sort.base.metric.MetricOption.RegisteredMetric; +import org.apache.inlong.sort.base.metric.MetricState; +import org.apache.inlong.sort.base.metric.SinkMetricData; +import org.apache.inlong.sort.base.util.MetricStateUtils; import org.apache.inlong.sort.iceberg.sink.RowDataTaskWriterFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.Closeable; import java.util.HashMap; import java.util.List; @@ -52,6 +62,9 @@ import static org.apache.iceberg.TableProperties.UPSERT_ENABLED; import static org.apache.iceberg.TableProperties.UPSERT_ENABLED_DEFAULT; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES; import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT; +import static org.apache.inlong.sort.base.Constants.INLONG_METRIC_STATE_NAME; +import static org.apache.inlong.sort.base.Constants.NUM_BYTES_OUT; +import static org.apache.inlong.sort.base.Constants.NUM_RECORDS_OUT; /** * Iceberg writer that can distinguish different sink tables and route and distribute data into different @@ -70,9 +83,23 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi private transient Map<TableIdentifier, Schema> multipleSchemas; private transient FunctionInitializationContext functionInitializationContext; - public IcebergMultipleStreamWriter(boolean appendMode, CatalogLoader catalogLoader) { + // metric + private final String inlongMetric; + private final String auditHostAndPorts; + @Nullable + private transient SinkMetricData metricData; + private transient ListState<MetricState> metricStateListState; + private transient MetricState metricState; + + public IcebergMultipleStreamWriter( + boolean appendMode, + CatalogLoader catalogLoader, + String inlongMetric, + String auditHostAndPorts) { this.appendMode = appendMode; this.catalogLoader = catalogLoader; + this.inlongMetric = inlongMetric; + this.auditHostAndPorts = auditHostAndPorts; } @Override @@ -81,6 +108,18 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi this.multipleWriters = new HashMap<>(); this.multipleTables = new HashMap<>(); this.multipleSchemas = new HashMap<>(); + + // Initialize metric + MetricOption metricOption = MetricOption.builder() + .withInlongLabels(inlongMetric) + .withInlongAudit(auditHostAndPorts) + .withInitRecords(metricState != null ? metricState.getMetricValue(NUM_RECORDS_OUT) : 0L) + .withInitBytes(metricState != null ? metricState.getMetricValue(NUM_BYTES_OUT) : 0L) + .withRegisterMetric(RegisteredMetric.ALL) + .build(); + if (metricOption != null) { + metricData = new SinkMetricData(metricOption, getRuntimeContext().getMetricGroup()); + } } @Override @@ -185,11 +224,29 @@ public class IcebergMultipleStreamWriter extends IcebergProcessFunction<RecordWi for (Entry<TableIdentifier, IcebergSingleStreamWriter<RowData>> entry: multipleWriters.entrySet()) { entry.getValue().snapshotState(context); } + + // metric + if (metricData != null && metricStateListState != null) { + MetricStateUtils.snapshotMetricStateForSinkMetricData(metricStateListState, metricData, + getRuntimeContext().getIndexOfThisSubtask()); + } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { this.functionInitializationContext = context; + + // init metric state + if (this.inlongMetric != null) { + this.metricStateListState = context.getOperatorStateStore().getUnionListState( + new ListStateDescriptor<>( + INLONG_METRIC_STATE_NAME, TypeInformation.of(new TypeHint<MetricState>() { + }))); + } + if (context.isRestored()) { + metricState = MetricStateUtils.restoreMetricState(metricStateListState, + getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); + } } private boolean isSchemaUpdate(RecordWithSchema recordWithSchema) {
