darenwkt commented on code in PR #22:
URL:
https://github.com/apache/flink-connector-prometheus/pull/22#discussion_r2125913882
##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSink.java:
##########
@@ -18,36 +18,13 @@
package org.apache.flink.connector.prometheus.sink;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.connector.base.sink.AsyncSinkBase;
-import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
-import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics;
-import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
import org.apache.flink.connector.prometheus.sink.prometheus.Types;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
-
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.util.Collection;
/** Sink implementation accepting {@link PrometheusTimeSeries} as inputs. */
@PublicEvolving
-public class PrometheusSink extends AsyncSinkBase<PrometheusTimeSeries,
Types.TimeSeries> {
- private final String prometheusRemoteWriteUrl;
- private final PrometheusAsyncHttpClientBuilder clientBuilder;
- private final PrometheusRequestSigner requestSigner;
- private final int maxBatchSizeInSamples;
- private final String httpUserAgent;
- private final
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
- errorHandlingBehaviorConfig;
- private final String metricGroupName;
-
- @SuppressWarnings("checkstyle:RegexpSingleline")
+public class PrometheusSink extends PrometheusSinkBase<PrometheusTimeSeries> {
Review Comment:
Yes, agree on a major version bump, the latest revision have removed the
coupling, please take a look
##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/sink/PrometheusSinkBase.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.sink;
+
+import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import
org.apache.flink.connector.prometheus.sink.http.PrometheusAsyncHttpClientBuilder;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetrics;
+import org.apache.flink.connector.prometheus.sink.metrics.SinkMetricsCallback;
+import org.apache.flink.connector.prometheus.sink.prometheus.Types;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Collection;
+
+public class PrometheusSinkBase<IN> extends AsyncSinkBase<IN,
Types.TimeSeries> {
+ private final String prometheusRemoteWriteUrl;
+ private final PrometheusAsyncHttpClientBuilder clientBuilder;
+ private final PrometheusRequestSigner requestSigner;
+ private final int maxBatchSizeInSamples;
+ private final int maxRecordSizeInSamples;
+ private final String httpUserAgent;
+ private final
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+ errorHandlingBehaviorConfig;
+ private final String metricGroupName;
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ protected PrometheusSinkBase(
+ ElementConverter<IN, Types.TimeSeries> elementConverter,
+ int maxInFlightRequests,
+ int maxBufferedRequests,
+ int maxBatchSizeInSamples,
+ int maxRecordSizeInSamples,
+ long maxTimeInBufferMS,
+ String prometheusRemoteWriteUrl,
+ PrometheusAsyncHttpClientBuilder clientBuilder,
+ PrometheusRequestSigner requestSigner,
+ String httpUserAgent,
+
PrometheusSinkConfiguration.SinkWriterErrorHandlingBehaviorConfiguration
+ errorHandlingBehaviorConfig,
+ String metricGroupName) {
+ // This sink batches in terms of "samples", because writes to
Prometheus are better
+ // optimized in terms of samples. AsyncSinkBase handles batching and
does not make any
+ // assumptions about the actual unit of "size", but parameters are
named assuming this unit
+ // is "bytes".
+ super(
+ elementConverter,
+ maxBatchSizeInSamples, // maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ maxBatchSizeInSamples, // maxBatchSizeInBytes,
+ maxTimeInBufferMS,
+ maxRecordSizeInSamples // maxRecordSizeInBytes
+ );
+
+ Preconditions.checkArgument(
+ maxBatchSizeInSamples > 1, "Max batch size (in samples) must
be positive");
+ Preconditions.checkArgument(
+ maxRecordSizeInSamples <= maxBatchSizeInSamples,
+ "Max record size (in samples) must be <= Max batch size");
+ Preconditions.checkArgument(maxInFlightRequests == 1, "Max in-flight
requests must be 1");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(prometheusRemoteWriteUrl),
+ "Missing or blank Prometheus Remote-Write URL");
+ checkValidRemoteWriteUrl(prometheusRemoteWriteUrl);
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(httpUserAgent), "Missing HTTP User
Agent string");
+ Preconditions.checkNotNull(
+ errorHandlingBehaviorConfig, "Missing error handling
configuration");
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(metricGroupName), "Missing metric group
name");
+ this.maxBatchSizeInSamples = maxBatchSizeInSamples;
+ this.maxRecordSizeInSamples = maxRecordSizeInSamples;
+ this.requestSigner = requestSigner;
+ this.prometheusRemoteWriteUrl = prometheusRemoteWriteUrl;
+ this.clientBuilder = clientBuilder;
+ this.httpUserAgent = httpUserAgent;
+ this.errorHandlingBehaviorConfig = errorHandlingBehaviorConfig;
+ this.metricGroupName = metricGroupName;
+ }
+
+ @Override
+ public StatefulSinkWriter<IN, BufferedRequestState<Types.TimeSeries>>
createWriter(
+ InitContext initContext) {
+ SinkMetricsCallback metricsCallback =
+ new SinkMetricsCallback(
+ SinkMetrics.registerSinkMetrics(
+
initContext.metricGroup().addGroup(metricGroupName)));
+ CloseableHttpAsyncClient asyncHttpClient =
+ clientBuilder.buildAndStartClient(metricsCallback);
+
+ return new PrometheusSinkWriter<>(
+ getElementConverter(),
+ initContext,
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ maxBatchSizeInSamples,
+ maxRecordSizeInSamples,
+ getMaxTimeInBufferMS(),
+ prometheusRemoteWriteUrl,
+ asyncHttpClient,
+ metricsCallback,
+ requestSigner,
+ httpUserAgent,
+ errorHandlingBehaviorConfig);
+ }
+
+ @Override
+ public StatefulSinkWriter<IN, BufferedRequestState<Types.TimeSeries>>
restoreWriter(
+ InitContext initContext,
+ Collection<BufferedRequestState<Types.TimeSeries>> recoveredState)
{
+ SinkMetricsCallback metricsCallback =
+ new SinkMetricsCallback(
+ SinkMetrics.registerSinkMetrics(
+
initContext.metricGroup().addGroup(metricGroupName)));
+ CloseableHttpAsyncClient asyncHttpClient =
+ clientBuilder.buildAndStartClient(metricsCallback);
+ return new PrometheusSinkWriter<>(
+ getElementConverter(),
+ initContext,
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ maxBatchSizeInSamples,
+ maxRecordSizeInSamples,
+ getMaxTimeInBufferMS(),
+ prometheusRemoteWriteUrl,
+ asyncHttpClient,
+ metricsCallback,
+ requestSigner,
+ httpUserAgent,
+ errorHandlingBehaviorConfig,
+ recoveredState);
+ }
+
+ public PrometheusSink castToPrometheusSink() {
+ return new PrometheusSink(
+ new PrometheusTimeSeriesConverter(),
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ maxBatchSizeInSamples,
+ maxRecordSizeInSamples,
+ getMaxTimeInBufferMS(),
+ prometheusRemoteWriteUrl,
+ clientBuilder,
+ requestSigner,
+ httpUserAgent,
+ errorHandlingBehaviorConfig,
+ metricGroupName);
+ }
Review Comment:
Yes, agree on a major version bump, the latest revision have fixed the
interface, please take a look
##########
flink-connector-prometheus/src/main/java/org/apache/flink/connector/prometheus/table/RowDataToPrometheusTimeSeriesConverter.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.prometheus.table;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.prometheus.sink.PrometheusTimeSeries;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.data.RowData.createFieldGetter;
+
+/**
+ * Converts from Flink Table API internal type of {@link RowData} to {@link
PrometheusTimeSeries}.
+ */
+@Internal
+public class RowDataToPrometheusTimeSeriesConverter {
+
+ private final DataType physicalDataType;
+ private final PrometheusConfig prometheusConfig;
+
+ public RowDataToPrometheusTimeSeriesConverter(
+ DataType physicalDataType, PrometheusConfig prometheusConfig) {
+ this.physicalDataType = physicalDataType;
+ this.prometheusConfig = prometheusConfig;
+ }
+
+ public PrometheusTimeSeries convertRowData(RowData row) {
+ List<DataTypes.Field> fields = DataType.getFields(physicalDataType);
+
+ PrometheusTimeSeries.Builder builder = PrometheusTimeSeries.builder();
+ Double sampleValue = null;
+ Long sampleTimestamp = null;
+
+ for (int i = 0; i < fields.size(); i++) {
+ DataTypes.Field field = fields.get(i);
+ RowData.FieldGetter fieldGetter =
+
createFieldGetter(fields.get(i).getDataType().getLogicalType(), i);
+ FieldValue fieldValue = new
FieldValue(fieldGetter.getFieldOrNull(row));
+ String fieldName = field.getName();
+
+ if (fieldName.equals(prometheusConfig.getMetricName())) {
+ builder.withMetricName(fieldValue.getStringValue());
+ } else if
(fieldName.equals(prometheusConfig.getMetricSampleKey())) {
+ sampleValue = fieldValue.getDoubleValue();
+ } else if (prometheusConfig.getLabelKeys().contains(fieldName)) {
+ builder.addLabel(fieldName, fieldValue.getStringValue());
+ } else if
(fieldName.equals(prometheusConfig.getMetricSampleTimestamp())) {
+ sampleTimestamp = fieldValue.getLongValue();
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "%s is not a supported field, valid fields
are: %s",
+ fieldName,
+ Arrays.asList(
+ prometheusConfig.getMetricName(),
+ prometheusConfig.getLabelKeys(),
+ prometheusConfig.getMetricSampleKey(),
+
prometheusConfig.getMetricSampleTimestamp())));
+ }
+ }
+
+ if (sampleValue != null && sampleTimestamp != null) {
+ builder.addSample(sampleValue, sampleTimestamp);
+ } else {
+ throw new IllegalArgumentException(
+ String.format(
+ "Row is missing sampleValue field; %s or
sampleTimestamp: %s",
+ sampleValue, sampleTimestamp));
+ }
+
+ return builder.build();
+ }
+
+ private static class FieldValue {
+ private final Object value;
+
+ private FieldValue(Object value) {
+ this.value = value;
+ }
+
+ private String getStringValue() {
+ return value.toString();
+ }
Review Comment:
This is a good call out, we only support
`org.apache.flink.table.data.StringData` type here, have added validation for
this in latest revision
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]