[GitHub] [flink] imaffe commented on a change in pull request #18816: [FLINK-26027][pulsar] add sink metrics to pulsar sink

2022-02-17 Thread GitBox


imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r808976119



##
File path: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java
##
@@ -76,7 +76,7 @@ void 
noneAndAtLeastOnceWouldNotCreateTransaction(DeliveryGuarantee deliveryGuara
 operator().createTopic(topic, 8);
 
 SinkConfiguration configuration = sinkConfiguration(deliveryGuarantee);
-TopicProducerRegister register = new 
TopicProducerRegister(configuration);
+TopicProducerRegister register = new 
TopicProducerRegister(configuration, null);

Review comment:
   should not use null. Will throw NPE




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #18816: [FLINK-26027][pulsar] add sink metrics to pulsar sink

2022-02-17 Thread GitBox


imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r808982242



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegister.java
##
@@ -152,7 +156,7 @@ public void close() throws IOException {
 builder.topic(topic);
 Producer producer = sneakyClient(builder::create);
 producers.put(schemaInfo, producer);
-
+pulsarSinkWriterMetrics.registerSingleProducerGauges(producer);

Review comment:
   We can add producer metrics periodically in the timeService or we can 
add the metrics register logic here. Not sure which one is better




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #18816: [FLINK-26027][pulsar] add sink metrics to pulsar sink

2022-02-17 Thread GitBox


imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r808976119



##
File path: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/topic/TopicProducerRegisterTest.java
##
@@ -76,7 +76,7 @@ void 
noneAndAtLeastOnceWouldNotCreateTransaction(DeliveryGuarantee deliveryGuara
 operator().createTopic(topic, 8);
 
 SinkConfiguration configuration = sinkConfiguration(deliveryGuarantee);
-TopicProducerRegister register = new 
TopicProducerRegister(configuration);
+TopicProducerRegister register = new 
TopicProducerRegister(configuration, null);

Review comment:
   should not use null. Will throw NPE, use UnregisteredMetricsGroup




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #18816: [FLINK-26027][pulsar] add sink metrics to pulsar sink

2022-02-17 Thread GitBox


imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r809181383



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/metrics/PulsarSinkWriterMetrics.java
##
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerStats;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import java.util.Comparator;
+import java.util.Map;
+
+/** Util class to provide monitor metrics methods to Sink Writer. */
+@Internal
+public class PulsarSinkWriterMetrics {
+public static final String PULSAR_SINK_METRIC_GROUP = "PulsarSink";
+public static final String PRODUCER_SUBGROUP = "producer";
+public static final String METRIC_PRODUCER_SEND_LATENCY_MAX = 
"sendLatencyMax";
+public static final String METRIC_NUM_ACKS_RECEIVED = "numAcksReceived";
+public static final String METRIC_SEND_LATENCY_50_PCT = "sendLatency50Pct";
+public static final String METRIC_SEND_LATENCY_75_PCT = "sendLatency75Pct";
+public static final String METRIC_SEND_LATENCY_95_PCT = "sendLatency95Pct";
+public static final String METRIC_SEND_LATENCY_99_PCT = "sendLatency99Pct";
+public static final String METRIC_SEND_LATENCY_999_PCT = 
"sendLatency999Pct";
+public static final double INVALID_LATENCY = -1d;
+public static final long INVALID_LAST_SEND_TIME = -1;
+
+private final SinkWriterMetricGroup sinkWriterMetricGroup;
+private final MetricGroup producerMetricGroup;
+
+// Standard counter metrics
+private long lastNumBytesOut = 0;
+private long lastNumRecordsOut = 0;
+private long lastNumRecordsOutErrors = 0;
+
+// Pulsar Producer counter metrics
+private final Counter numAcksReceived;
+private long lastNumAcksReceived = 0;
+
+public PulsarSinkWriterMetrics(SinkWriterMetricGroup 
sinkWriterMetricGroup) {
+this.sinkWriterMetricGroup = sinkWriterMetricGroup;
+this.producerMetricGroup = 
sinkWriterMetricGroup.addGroup(PULSAR_SINK_METRIC_GROUP);
+this.numAcksReceived = 
producerMetricGroup.counter(METRIC_NUM_ACKS_RECEIVED);
+}
+
+public  void updateProducerStats(
+Map>> producerRegister) {
+long numBytesOut = 0;
+long numRecordsOut = 0;
+long numRecordsOutErrors = 0;
+long numAcksReceived = 0;
+
+for (Map> producers : 
producerRegister.values()) {
+for (Producer producer : producers.values()) {
+ProducerStats stats = producer.getStats();

Review comment:
   Split the loops and refactor to use functions




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #18816: [FLINK-26027][pulsar] add sink metrics to pulsar sink

2022-02-17 Thread GitBox


imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r809602712



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##
@@ -124,8 +131,15 @@ public PulsarWriter(
 throw new FlinkRuntimeException("Cannot initialize schema.", e);
 }
 
+// Initialize metrics
+this.pulsarSinkWriterMetrics = new 
PulsarSinkWriterMetrics(initContext.metricGroup());
+this.lastSendTime = PulsarSinkWriterMetrics.INVALID_LAST_SEND_TIME;
 // Create this producer register after opening serialization schema!
-this.producerRegister = new TopicProducerRegister(sinkConfiguration);
+this.producerRegister =
+new TopicProducerRegister(sinkConfiguration, 
pulsarSinkWriterMetrics);

Review comment:
   Consider keep pulsasrSinkWriterMetrics outside of topicProducerRegister, 
instead, all metrics registering code should happen in pulsarWriter.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #18816: [FLINK-26027][pulsar] add sink metrics to pulsar sink

2022-02-18 Thread GitBox


imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r810455595



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##
@@ -270,5 +287,40 @@ public void flush(boolean endOfInput) throws IOException, 
InterruptedException {
 public void close() throws Exception {
 // Close all the resources and throw the exception at last.
 closeAll(metadataListener, producerRegister);
+closed = true;
+}
+
+private void setupFlinkMetrics() {
+lastMetricUpdateTimestamp = timeService.getCurrentProcessingTime();
+registerGlobalGauges();
+registerMetricUpdateTimer();
+}
+
+/**
+ * Producer is lazy initialized in Pulsar Sink. Some metrics (currently 
only some gauges) can
+ * only be set after creating the producer. Those gauges are per producer 
wise. Some other
+ * metrics is based aggregation of all producers or they are based on sink 
implementations so
+ * they are called global gauges.
+ */
+private void registerGlobalGauges() {
+
pulsarSinkWriterMetrics.setCurrentSendTimeGauge(this::getCurrentSendTime);
+producerRegister.registerMaxSendLatencyGauges();
+}
+
+private void registerMetricUpdateTimer() {

Review comment:
   Move these timer tasks to a dedicated class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #18816: [FLINK-26027][pulsar] add sink metrics to pulsar sink

2022-02-21 Thread GitBox


imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r811612902



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
##
@@ -124,8 +131,17 @@ public PulsarWriter(
 throw new FlinkRuntimeException("Cannot initialize schema.", e);
 }
 
+// Initialize metrics
+this.pulsarSinkWriterMetricUtils =

Review comment:
   The naming is a bit confusing, it should not be utils tho, need to come 
up with a better naming




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #18816: [FLINK-26027][pulsar] add sink metrics to pulsar sink

2022-02-21 Thread GitBox


imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r811614446



##
File path: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java
##
@@ -44,32 +47,198 @@
 import org.apache.flink.util.UserCodeClassLoader;
 
 import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
 import java.util.Collection;
 import java.util.OptionalLong;
+import java.util.concurrent.TimeUnit;
 
 import static java.util.Collections.singletonList;
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.base.DeliveryGuarantee.AT_LEAST_ONCE;
 import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE;
+import static 
org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS;
+import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_PRODUCER_NAME;
 import static 
org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION;
+import static 
org.apache.flink.connector.pulsar.sink.writer.metrics.PulsarProducerMetricsRegister.PRODUCER_UPDATE_POLLING_INTERVAL_MILLIS;
 import static 
org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema.pulsarSchema;
+import static 
org.apache.flink.shaded.guava30.com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 import static org.apache.pulsar.client.api.Schema.STRING;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Unit tests for {@link PulsarWriter}. */
 class PulsarWriterTest extends PulsarTestSuiteBase {
-
 private static final SinkWriter.Context CONTEXT = new 
MockSinkWriterContext();
+private static final String DEFAULT_TEST_PRODUCER_NAME = "test-producer";
+private static final String EXPECTED_ALL_PRODUCER_METRIC_NAME = 
"PulsarSink.numAcksReceived";
+private static final String EXPECTED_PER_PRODUCER_METRIC_PREFIX = 
"PulsarSink.producer";
+private static final String EXPECTED_PER_PRODUCER_METRIC_NAME = 
"sendLatency99Pct";
+private static final String GLOBAL_MAX_LATENCY_METRIC_NAME = 
"PulsarSink.sendLatencyMax";
+private static final String CURRENT_SEND_TIME_METRIC_NAME = 
"currentSendTime";
+private static final String NUM_ACKS_RECEIVED_METRIC_NAME = 
"PulsarSink.numAcksReceived";
+private static final long TEST_PULSAR_STATS_INTERVAL_SECONDS = 1L;
+
+private MetricListener metricListener;
+private TestProcessingTimeService timeService;
+private MockInitContext initContext;
+
+@BeforeEach
+void setup() throws Exception {
+metricListener = new MetricListener();
+timeService = new TestProcessingTimeService();
+timeService.setCurrentTime(0L);
+initContext = new MockInitContext(metricListener, timeService);
+}
 
 @Test
 void writeMessageWithGuarantee() throws Exception {
 writeMessageWithoutGuarantee(EXACTLY_ONCE);
 }
 
+@Test
+void metricsPresentAfterWriterCreated() throws Exception {
+String topic = randomAlphabetic(10);
+operator().createTopic(topic, 8);
+
+try (final PulsarWriter ignored = createWriter(topic, 
initContext)) {
+
assertThat(metricListener.getCounter(EXPECTED_ALL_PRODUCER_METRIC_NAME)).isPresent();
+}
+}
+
+@Test
+void perProducerMetricsPresentAfterMessageWritten() throws Exception {
+String topic = randomAlphabetic(10);
+operator().createTopic(topic, 8);
+
+try (final PulsarWriter writer = createWriter(topic, 
initContext)) {
+String message = randomAlphabetic(10);
+sendMessages(writer, 1);
+// advance timer to update the producers
+timeService.advance(PRODUCER_UPDATE_POLLING_INTERVAL_MILLIS + 
1000);
+assertThat(metricListener.getGauge(perProducerMetricName(topic, 
0))).isPresent();
+
+// send another message and advance timer
+sendMessages(writer, 1000);

Review comment:
   The 1000 here is a magic number, it should be equal to the switch size 
of round robin router




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #18816: [FLINK-26027][pulsar] add sink metrics to pulsar sink

2022-04-01 Thread GitBox


imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r840379481



##
File path: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
##
@@ -178,4 +182,35 @@ public T getValue() {
 return null;
 }
 }
+
+private static class UnregisteredSinkWriterMetricGroup extends 
UnregisteredMetricsGroup

Review comment:
   Because I didn't find a good way to mock SinkWriterMetricGroup, and I 
saw there are other UnregisterdXXXMetricGroup() used in Sink




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #18816: [FLINK-26027][pulsar] add sink metrics to pulsar sink

2022-04-01 Thread GitBox


imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r840379481



##
File path: 
flink-metrics/flink-metrics-core/src/main/java/org/apache/flink/metrics/groups/UnregisteredMetricsGroup.java
##
@@ -178,4 +182,35 @@ public T getValue() {
 return null;
 }
 }
+
+private static class UnregisteredSinkWriterMetricGroup extends 
UnregisteredMetricsGroup

Review comment:
   Because I didn't find a good way to mock SinkWriterMetricGroup, and I 
saw there are other UnregisterdXXXMetricGroup() used in tests, so I decided to 
create one for sink.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [flink] imaffe commented on a change in pull request #18816: [FLINK-26027][pulsar] add sink metrics to pulsar sink

2022-03-16 Thread GitBox


imaffe commented on a change in pull request #18816:
URL: https://github.com/apache/flink/pull/18816#discussion_r828062932



##
File path: 
flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/metrics/PulsarSinkWriterMetricUtils.java
##
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.sink.writer.metrics;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerStats;
+import org.apache.pulsar.common.schema.SchemaInfo;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/** Util class to provide monitor metrics methods to Sink Writer. */
+@Internal
+public class PulsarSinkWriterMetricUtils {

Review comment:
   Yeah I agree. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org