This is an automated email from the ASF dual-hosted git repository. renqs pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new 221e9f7c5e5 [FLINK-29567][connector/common] Change numRecordsSend / numBytesSend / numRecordsSendError back to numRecordsOut / numBytesOut / numRecordsOutError in sink 221e9f7c5e5 is described below commit 221e9f7c5e59265b7448eba70aea393b60a1fe42 Author: Qingsheng Ren <renqs...@gmail.com> AuthorDate: Fri Oct 14 16:32:52 2022 +0800 [FLINK-29567][connector/common] Change numRecordsSend / numBytesSend / numRecordsSendError back to numRecordsOut / numBytesOut / numRecordsOutError in sink --- .../firehose/sink/KinesisFirehoseSinkWriter.java | 9 +------- .../kinesis/sink/KinesisStreamsSinkWriter.java | 8 +------ .../base/sink/writer/AsyncSinkWriter.java | 12 +++++----- .../base/sink/writer/TestSinkInitContext.java | 4 ++-- .../connector/file/sink/writer/FileWriter.java | 7 +++--- .../connector/file/sink/writer/FileWriterTest.java | 2 +- .../flink/connector/kafka/sink/KafkaWriter.java | 18 ++++++--------- .../connector/kafka/sink/KafkaWriterITCase.java | 22 ++++++++---------- .../metrics/groups/SinkWriterMetricGroup.java | 17 ++++++++++---- .../apache/flink/runtime/metrics/MetricNames.java | 3 +-- .../groups/InternalOperatorIOMetricGroup.java | 6 +++-- .../groups/InternalSinkWriterMetricGroup.java | 16 ++++++++----- .../api/operators/AbstractStreamOperator.java | 11 +++++---- .../runtime/operators/sink/SinkWriterOperator.java | 18 +++++++++++++++ .../test/streaming/runtime/SinkMetricsITCase.java | 26 ++++++++++++++-------- 15 files changed, 101 insertions(+), 78 deletions(-) diff --git a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java index 24a17eee292..a60990676bb 100644 --- a/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java +++ b/flink-connectors/flink-connector-aws-kinesis-firehose/src/main/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkWriter.java @@ -95,11 +95,7 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER, getSdkClientMisconfiguredExceptionClassifier()); - // deprecated, use numRecordsSendErrorsCounter instead. - @Deprecated private final Counter numRecordsOutErrorsCounter; - - /* A counter for the total number of records that have encountered an error during put */ - private final Counter numRecordsSendErrorsCounter; + private final Counter numRecordsOutErrorsCounter; /* Name of the delivery stream in Kinesis Data Firehose */ private final String deliveryStreamName; @@ -170,7 +166,6 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> this.deliveryStreamName = deliveryStreamName; this.metrics = context.metricGroup(); this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter(); - this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter(); this.httpClient = createHttpClient(firehoseClientProperties); this.firehoseClient = createFirehoseClient(firehoseClientProperties, httpClient); } @@ -218,7 +213,6 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> requestEntries.get(0).toString(), err); numRecordsOutErrorsCounter.inc(requestEntries.size()); - numRecordsSendErrorsCounter.inc(requestEntries.size()); if (isRetryable(err)) { requestResult.accept(requestEntries); @@ -234,7 +228,6 @@ class KinesisFirehoseSinkWriter<InputT> extends AsyncSinkWriter<InputT, Record> requestEntries.size(), requestEntries.get(0).toString()); numRecordsOutErrorsCounter.inc(response.failedPutCount()); - numRecordsSendErrorsCounter.inc(response.failedPutCount()); if (failOnError) { getFatalExceptionCons() diff --git a/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java index a3be9136c3b..fe264e43d69 100644 --- a/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java +++ b/flink-connectors/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/sink/KinesisStreamsSinkWriter.java @@ -76,11 +76,7 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord RESOURCE_NOT_FOUND_EXCEPTION_CLASSIFIER, getSdkClientMisconfiguredExceptionClassifier()); - // deprecated, use numRecordsSendErrorsCounter instead. - @Deprecated private final Counter numRecordsOutErrorsCounter; - - /* A counter for the total number of records that have encountered an error during put */ - private final Counter numRecordsSendErrorsCounter; + private final Counter numRecordsOutErrorsCounter; /* Name of the stream in Kinesis Data Streams */ private final String streamName; @@ -151,7 +147,6 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord this.streamName = streamName; this.metrics = context.metricGroup(); this.numRecordsOutErrorsCounter = metrics.getNumRecordsOutErrorsCounter(); - this.numRecordsSendErrorsCounter = metrics.getNumRecordsSendErrorsCounter(); this.httpClient = AWSGeneralUtil.createAsyncHttpClient(kinesisClientProperties); this.kinesisClient = buildClient(kinesisClientProperties, this.httpClient); } @@ -204,7 +199,6 @@ class KinesisStreamsSinkWriter<InputT> extends AsyncSinkWriter<InputT, PutRecord requestEntries.size(), err); numRecordsOutErrorsCounter.inc(requestEntries.size()); - numRecordsSendErrorsCounter.inc(requestEntries.size()); if (isRetryable(err)) { requestResult.accept(requestEntries); diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java index f7a649b1636..812b36d4d7d 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java @@ -69,10 +69,10 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable private final SinkWriterMetricGroup metrics; /* Counter for number of bytes this sink has attempted to send to the destination. */ - private final Counter numBytesSendCounter; + private final Counter numBytesOutCounter; /* Counter for number of records this sink has attempted to send to the destination. */ - private final Counter numRecordsSendCounter; + private final Counter numRecordsOutCounter; /** * Rate limiting strategy {@code inflightMessages} at any given time, {@code @@ -295,8 +295,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable this.metrics = context.metricGroup(); this.metrics.setCurrentSendTimeGauge(() -> this.ackTime - this.lastSendTimestamp); - this.numBytesSendCounter = this.metrics.getNumBytesSendCounter(); - this.numRecordsSendCounter = this.metrics.getNumRecordsSendCounter(); + this.numBytesOutCounter = this.metrics.getIOMetricGroup().getNumBytesOutCounter(); + this.numRecordsOutCounter = this.metrics.getIOMetricGroup().getNumRecordsOutCounter(); this.fatalExceptionCons = exception -> @@ -417,8 +417,8 @@ public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable batchSizeBytes += requestEntrySize; } - numRecordsSendCounter.inc(batch.size()); - numBytesSendCounter.inc(batchSizeBytes); + numRecordsOutCounter.inc(batch.size()); + numBytesOutCounter.inc(batchSizeBytes); return batch; } diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java index a7e4979efdf..b1461903fc8 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/TestSinkInitContext.java @@ -146,10 +146,10 @@ public class TestSinkInitContext implements Sink.InitContext { } public Counter getNumRecordsOutCounter() { - return metricGroup.getNumRecordsSendCounter(); + return metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); } public Counter getNumBytesOutCounter() { - return metricGroup.getNumBytesSendCounter(); + return metricGroup.getIOMetricGroup().getNumBytesOutCounter(); } } diff --git a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java index 51cc6d8c10f..1d0de3b28e5 100644 --- a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java +++ b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java @@ -92,7 +92,7 @@ public class FileWriter<IN> private final OutputFileConfig outputFileConfig; - private final Counter numRecordsSendCounter; + private final Counter numRecordsOutCounter; private boolean endOfInput; @@ -128,7 +128,8 @@ public class FileWriter<IN> this.activeBuckets = new HashMap<>(); this.bucketerContext = new BucketerContext(); - this.numRecordsSendCounter = checkNotNull(metricGroup).getNumRecordsSendCounter(); + this.numRecordsOutCounter = + checkNotNull(metricGroup).getIOMetricGroup().getNumRecordsOutCounter(); this.processingTimeService = checkNotNull(processingTimeService); checkArgument( bucketCheckInterval > 0, @@ -195,7 +196,7 @@ public class FileWriter<IN> final String bucketId = bucketAssigner.getBucketId(element, bucketerContext); final FileWriterBucket<IN> bucket = getOrCreateBucketForBucketId(bucketId); bucket.write(element, processingTimeService.getCurrentProcessingTime()); - numRecordsSendCounter.inc(); + numRecordsOutCounter.inc(); } @Override diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java index c0a5ac66776..6a63a7957a6 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/sink/writer/FileWriterTest.java @@ -298,7 +298,7 @@ public class FileWriterTest { InternalSinkWriterMetricGroup.mock( metricListener.getMetricGroup(), operatorIOMetricGroup); - Counter recordsCounter = sinkWriterMetricGroup.getNumRecordsSendCounter(); + Counter recordsCounter = sinkWriterMetricGroup.getIOMetricGroup().getNumRecordsOutCounter(); SinkWriter.Context context = new ContextImpl(); FileWriter<String> fileWriter = createWriter( diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java index cd46d67f786..53e48d9cd6e 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java @@ -93,11 +93,9 @@ class KafkaWriter<IN> private final Map<String, KafkaMetricMutableWrapper> previouslyCreatedMetrics = new HashMap<>(); private final SinkWriterMetricGroup metricGroup; private final boolean disabledMetrics; - private final Counter numRecordsSendCounter; - private final Counter numBytesSendCounter; - // deprecated, use numRecordsSendErrorsCounter instead. - @Deprecated private final Counter numRecordsOutErrorsCounter; - private final Counter numRecordsSendErrorsCounter; + private final Counter numRecordsOutCounter; + private final Counter numBytesOutCounter; + private final Counter numRecordsOutErrorsCounter; private final ProcessingTimeService timeService; // Number of outgoing bytes at the latest metric sync @@ -155,10 +153,9 @@ class KafkaWriter<IN> checkNotNull(sinkInitContext, "sinkInitContext"); this.timeService = sinkInitContext.getProcessingTimeService(); this.metricGroup = sinkInitContext.metricGroup(); - this.numBytesSendCounter = metricGroup.getNumBytesSendCounter(); - this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter(); + this.numBytesOutCounter = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); + this.numRecordsOutCounter = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); this.numRecordsOutErrorsCounter = metricGroup.getNumRecordsOutErrorsCounter(); - this.numRecordsSendErrorsCounter = metricGroup.getNumRecordsSendErrorsCounter(); this.kafkaSinkContext = new DefaultKafkaSinkContext( sinkInitContext.getSubtaskId(), @@ -198,7 +195,7 @@ class KafkaWriter<IN> final ProducerRecord<byte[], byte[]> record = recordSerializer.serialize(element, kafkaSinkContext, context.timestamp()); currentProducer.send(record, deliveryCallback); - numRecordsSendCounter.inc(); + numRecordsOutCounter.inc(); } @Override @@ -391,7 +388,7 @@ class KafkaWriter<IN> long outgoingBytesUntilNow = ((Number) byteOutMetric.metricValue()).longValue(); long outgoingBytesSinceLastUpdate = outgoingBytesUntilNow - latestOutgoingByteTotal; - numBytesSendCounter.inc(outgoingBytesSinceLastUpdate); + numBytesOutCounter.inc(outgoingBytesSinceLastUpdate); latestOutgoingByteTotal = outgoingBytesUntilNow; lastSync = time; registerMetricSync(); @@ -417,7 +414,6 @@ class KafkaWriter<IN> mailboxExecutor.execute( () -> { numRecordsOutErrorsCounter.inc(); - numRecordsSendErrorsCounter.inc(); throwException(metadata, exception, producer); }, "Failed to send data to Kafka"); diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index 8c7e50aa704..bea559c107e 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -140,21 +140,21 @@ public class KafkaWriterITCase { try (final KafkaWriter<Integer> writer = createWriterWithConfiguration( getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) { - final Counter numBytesSend = metricGroup.getNumBytesSendCounter(); - final Counter numRecordsSend = metricGroup.getNumRecordsSendCounter(); - final Counter numRecordsWrittenErrors = metricGroup.getNumRecordsOutErrorsCounter(); + final Counter numBytesOut = metricGroup.getIOMetricGroup().getNumBytesOutCounter(); + final Counter numRecordsOut = metricGroup.getIOMetricGroup().getNumRecordsOutCounter(); + final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter(); - assertThat(numBytesSend.getCount()).isEqualTo(0L); - assertThat(numRecordsSend.getCount()).isEqualTo(0); - assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0); + assertThat(numBytesOut.getCount()).isEqualTo(0L); + assertThat(numRecordsOut.getCount()).isEqualTo(0); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); writer.write(1, SINK_WRITER_CONTEXT); timeService.trigger(); - assertThat(numRecordsSend.getCount()).isEqualTo(1); - assertThat(numRecordsWrittenErrors.getCount()).isEqualTo(0); + assertThat(numRecordsOut.getCount()).isEqualTo(1); + assertThat(numRecordsOutErrors.getCount()).isEqualTo(0); assertThat(numRecordsSendErrors.getCount()).isEqualTo(0); - assertThat(numBytesSend.getCount()).isGreaterThan(0L); + assertThat(numBytesOut.getCount()).isGreaterThan(0L); } } @@ -198,13 +198,10 @@ public class KafkaWriterITCase { createWriterWithConfiguration( properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup)) { final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); - final Counter numRecordsSendErrors = metricGroup.getNumRecordsSendErrorsCounter(); assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); - assertThat(numRecordsSendErrors.getCount()).isEqualTo(0L); writer.write(1, SINK_WRITER_CONTEXT); assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); - assertThat(numRecordsSendErrors.getCount()).isEqualTo(0L); final String transactionalId = writer.getCurrentProducer().getTransactionalId(); @@ -221,7 +218,6 @@ public class KafkaWriterITCase { writer.flush(false); writer.prepareCommit(); assertThat(numRecordsOutErrors.getCount()).isEqualTo(1L); - assertThat(numRecordsSendErrors.getCount()).isEqualTo(1L); } } diff --git a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java index 1da6c4b8a29..f3f81201436 100644 --- a/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java +++ b/flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/SinkWriterMetricGroup.java @@ -29,11 +29,14 @@ import org.apache.flink.metrics.Gauge; @PublicEvolving public interface SinkWriterMetricGroup extends OperatorMetricGroup { - /** @deprecated use {@link #getNumRecordsSendErrorsCounter()} instead. */ - @Deprecated + /** The total number of records failed to send. */ Counter getNumRecordsOutErrorsCounter(); - /** The total number of records failed to send. */ + /** + * The total number of records failed to send. + * + * <p>This metric has the same value as {@code numRecordsOutError}. + */ Counter getNumRecordsSendErrorsCounter(); /** @@ -44,10 +47,16 @@ public interface SinkWriterMetricGroup extends OperatorMetricGroup { * may have issue to perform the persistence action within its scope. Therefore, this count may * include the number of records that are failed to write by the downstream system, which should * be counted by {@link #getNumRecordsSendErrorsCounter()}. + * + * <p>This metric has the same value as {@code numRecordsOut} of the operator. */ Counter getNumRecordsSendCounter(); - /** The total number of output send bytes since the task started. */ + /** + * The total number of output send bytes since the task started. + * + * <p>This metric has the same value as {@code numBytesOut} of the operator + */ Counter getNumBytesSendCounter(); /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java index ee922d11649..1ac400510c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/MetricNames.java @@ -84,8 +84,7 @@ public class MetricNames { public static final String DEBLOATED_BUFFER_SIZE = "debloatedBufferSize"; // FLIP-33 sink - // deprecated use NUM_RECORDS_SEND_ERRORS instead. - @Deprecated public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors"; + public static final String NUM_RECORDS_OUT_ERRORS = "numRecordsOutErrors"; public static final String NUM_RECORDS_SEND_ERRORS = "numRecordsSendErrors"; public static final String CURRENT_SEND_TIME = "currentSendTime"; public static final String NUM_RECORDS_SEND = "numRecordsSend"; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java index f03ab5206d1..31cf560ce78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalOperatorIOMetricGroup.java @@ -52,8 +52,10 @@ public class InternalOperatorIOMetricGroup extends ProxyMetricGroup<InternalOper numRecordsOutRate = parentMetricGroup.meter( MetricNames.IO_NUM_RECORDS_OUT_RATE, new MeterView(numRecordsOut)); - numBytesIn = parentMetricGroup.getTaskIOMetricGroup().getNumBytesInCounter(); - numBytesOut = parentMetricGroup.getTaskIOMetricGroup().getNumBytesOutCounter(); + numBytesIn = parentMetricGroup.counter(MetricNames.IO_NUM_BYTES_IN); + numBytesOut = parentMetricGroup.counter(MetricNames.IO_NUM_BYTES_OUT); + parentMetricGroup.meter(MetricNames.IO_NUM_BYTES_IN_RATE, new MeterView(numBytesIn)); + parentMetricGroup.meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new MeterView(numBytesOut)); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java index 9243e32cc4a..81aa8d78ce3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/InternalSinkWriterMetricGroup.java @@ -34,8 +34,7 @@ import org.apache.flink.runtime.metrics.MetricNames; public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup> implements SinkWriterMetricGroup { - // deprecated, use numRecordsSendErrors instead. - @Deprecated private final Counter numRecordsOutErrors; + private final Counter numRecordsOutErrors; private final Counter numRecordsSendErrors; private final Counter numRecordsWritten; private final Counter numBytesWritten; @@ -45,9 +44,15 @@ public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup> MetricGroup parentMetricGroup, OperatorIOMetricGroup operatorIOMetricGroup) { super(parentMetricGroup); numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS); - numRecordsSendErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS); - numRecordsWritten = parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND); - numBytesWritten = parentMetricGroup.counter(MetricNames.NUM_BYTES_SEND); + numRecordsSendErrors = + parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS, numRecordsOutErrors); + numRecordsWritten = + parentMetricGroup.counter( + MetricNames.NUM_RECORDS_SEND, + operatorIOMetricGroup.getNumRecordsOutCounter()); + numBytesWritten = + parentMetricGroup.counter( + MetricNames.NUM_BYTES_SEND, operatorIOMetricGroup.getNumBytesOutCounter()); this.operatorIOMetricGroup = operatorIOMetricGroup; } @@ -73,7 +78,6 @@ public class InternalSinkWriterMetricGroup extends ProxyMetricGroup<MetricGroup> return operatorIOMetricGroup; } - @Deprecated @Override public Counter getNumRecordsOutErrorsCounter() { return numRecordsOutErrors; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 3b103c22a22..a1afe279833 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -164,10 +164,7 @@ public abstract class AbstractStreamOperator<OUT> environment .getMetricGroup() .getOrAddOperator(config.getOperatorID(), config.getOperatorName()); - this.output = - new CountingOutput<>( - output, - operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter()); + this.output = registerCounterOnOutput(output, operatorMetricGroup); if (config.isChainEnd()) { operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); } @@ -649,4 +646,10 @@ public abstract class AbstractStreamOperator<OUT> protected Optional<InternalTimeServiceManager<?>> getTimeServiceManager() { return Optional.ofNullable(timeServiceManager); } + + protected Output<StreamRecord<OUT>> registerCounterOnOutput( + Output<StreamRecord<OUT>> output, OperatorMetricGroup operatorMetricGroup) { + return new CountingOutput<>( + output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter()); + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java index e593616abf7..9029584f986 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperator.java @@ -30,6 +30,7 @@ import org.apache.flink.api.connector.sink2.StatefulSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; import org.apache.flink.runtime.state.StateInitializationContext; @@ -41,6 +42,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.BoundedOneInput; import org.apache.flink.streaming.api.operators.InternalTimerService; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; import org.apache.flink.streaming.api.watermark.Watermark; @@ -216,6 +218,22 @@ class SinkWriterOperator<InputT, CommT> extends AbstractStreamOperator<Committab closeAll(sinkWriter, super::close); } + /** + * Skip registering numRecordsOut counter on output. + * + * <p>Metric "numRecordsOut" is defined as the total number of records written to the external + * system in FLIP-33, but this metric is occupied in AbstractStreamOperator as the number of + * records sent to downstream operators, which is number of Committable batches sent to + * SinkCommitter. So we skip registering this metric on output and leave this metric to sink + * writer implementations to report. + */ + @Override + protected Output<StreamRecord<CommittableMessage<CommT>>> registerCounterOnOutput( + Output<StreamRecord<CommittableMessage<CommT>>> output, + OperatorMetricGroup operatorMetricGroup) { + return output; + } + private void emit( int indexOfThisSubtask, int numberOfParallelSubtasks, diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java index 90b696dd8eb..d2232010546 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkMetricsITCase.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.connector.sink.Sink; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobClient; -import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; @@ -133,18 +132,17 @@ public class SinkMetricsITCase extends TestLogger { for (OperatorMetricGroup group : groups) { Map<String, Metric> metrics = reporter.getMetricsByGroup(group); // There are only 2 splits assigned; so two groups will not update metrics. - // There is no other way to access the counter via OperatorMetricGroup, we have to use - // metrics from the reporter. - if (((Counter) metrics.get(MetricNames.NUM_RECORDS_SEND)).getCount() == 0) { + if (group.getIOMetricGroup().getNumRecordsOutCounter().getCount() == 0) { continue; } subtaskWithMetrics++; + // SinkWriterMetricGroup metrics assertThat( - metrics.get(MetricNames.NUM_RECORDS_SEND), + metrics.get(MetricNames.IO_NUM_RECORDS_OUT), isCounter(equalTo(processedRecordsPerSubtask))); assertThat( - metrics.get(MetricNames.NUM_BYTES_SEND), + metrics.get(MetricNames.IO_NUM_BYTES_OUT), isCounter( equalTo( processedRecordsPerSubtask @@ -153,6 +151,17 @@ public class SinkMetricsITCase extends TestLogger { assertThat( metrics.get(MetricNames.NUM_RECORDS_OUT_ERRORS), isCounter(equalTo((processedRecordsPerSubtask + 1) / 2))); + + // Test "send" metric series has the same value as "out" metric series. + assertThat( + metrics.get(MetricNames.NUM_RECORDS_SEND), + isCounter(equalTo(processedRecordsPerSubtask))); + assertThat( + metrics.get(MetricNames.NUM_BYTES_SEND), + isCounter( + equalTo( + processedRecordsPerSubtask + * MetricWriter.RECORD_SIZE_IN_BYTES))); assertThat( metrics.get(MetricNames.NUM_RECORDS_SEND_ERRORS), isCounter(equalTo((processedRecordsPerSubtask + 1) / 2))); @@ -183,12 +192,11 @@ public class SinkMetricsITCase extends TestLogger { public void write(Long element, Context context) { super.write(element, context); sendTime = element * BASE_SEND_TIME; - metricGroup.getNumRecordsSendCounter().inc(); + metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc(); if (element % 2 == 0) { metricGroup.getNumRecordsOutErrorsCounter().inc(); - metricGroup.getNumRecordsSendErrorsCounter().inc(); } - metricGroup.getNumBytesSendCounter().inc(RECORD_SIZE_IN_BYTES); + metricGroup.getIOMetricGroup().getNumBytesOutCounter().inc(RECORD_SIZE_IN_BYTES); } } }