Copilot commented on code in PR #4156:
URL: https://github.com/apache/gobblin/pull/4156#discussion_r2542767484
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java:
##########
@@ -30,4 +30,13 @@ public interface Pusher<M> extends Closeable {
* @param messages List of byte array messages to push to Kakfa.
*/
void pushMessages(List<M> messages);
+
+ /**
+ * Synchronous version of {@link #pushMessages(List)}.
+ * Default implementation just calls {@link #pushMessages(List)}.
+ * @param messages
Review Comment:
Missing JavaDoc parameter description. The `@param messages` tag should
include a description of what messages are being pushed, similar to line 30.
```suggestion
* @param messages List of byte array messages to push to Kafka.
```
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/KafkaEventKeyValueReporter.java:
##########
@@ -54,6 +54,27 @@ protected KafkaEventKeyValueReporter(Builder<?> builder)
throws IOException {
@Override
public void reportEventQueue(Queue<GobblinTrackingEvent> queue) {
+ List<Pair<String, byte[]>> events = getEventsFromQueue(queue);
+ if (!events.isEmpty()) {
+ log.info("Pushing {} Gobblin Tracking Events to Kafka", events.size());
+ this.kafkaPusher.pushMessages(events);
+ } else {
+ log.debug("No GTE to push.");
+ }
+ }
+
+ @Override
+ public void reportEventQueueSynchronously(Queue<GobblinTrackingEvent> queue)
{
+ List<Pair<String, byte[]>> events = getEventsFromQueue(queue);
+ if (!events.isEmpty()) {
+ log.info("Pushing {} Gobblin Tracking Events to Kafka", events.size());
+ this.kafkaPusher.pushMessagesSync(events);
+ } else {
+ log.debug("No GTE to push.");
+ }
+ }
Review Comment:
Code duplication: The logic in `reportEventQueue()` and
`reportEventQueueSynchronously()` is identical except for the call to
`pushMessages()` vs `pushMessagesSync()`. Consider refactoring by having both
methods call a common helper method that accepts the push method as a
parameter, or have one method call the other with a boolean flag to indicate
sync/async mode.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java:
##########
@@ -19,22 +19,50 @@
import java.util.Map;
-import io.temporal.workflow.Workflow;
-
-import org.slf4j.Logger;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.cluster.ContainerMetrics;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.util.ConfigUtils;
+@Slf4j
public class SubmitGTEActivityImpl implements SubmitGTEActivity {
- private static Logger log =
Workflow.getLogger(SubmitGTEActivityImpl.class);
@Override
public void submitGTE(GobblinEventBuilder eventBuilder,
EventSubmitterContext eventSubmitterContext) {
- log.info("submitting GTE - {}",
summarizeEventMetadataForLogging(eventBuilder));
+ log.info("Submitting GTE - {}",
summarizeEventMetadataForLogging(eventBuilder));
eventSubmitterContext.create().submit(eventBuilder);
+ Config config = WorkerConfig.of(this).orElse(ConfigFactory.load());
+ if
(config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID)
&&
+
config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME))
{
+ String containerMetricsApplicationName =
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
+ String containerMetricsTaskRunnerId =
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID);
+ ContainerMetrics containerMetrics =
ContainerMetrics.get(ConfigUtils.configToState(config),
containerMetricsApplicationName, containerMetricsTaskRunnerId);
+ log.info("Fetched container metrics instance {} for taskRunnerId:
{} and applicationName: {}", containerMetrics.toString(),
Review Comment:
Redundant use of `toString()`. When using string formatting with `{}`
placeholders in SLF4J, the object's `toString()` method is called
automatically. The explicit `.toString()` call is unnecessary and should be
removed.
```suggestion
log.info("Fetched container metrics instance {} for
taskRunnerId: {} and applicationName: {}", containerMetrics,
```
##########
gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/reporter/EventReporter.java:
##########
@@ -163,12 +163,28 @@ public void report() {
reportEventQueue(this.reportingQueue);
}
+ /**
+ * Report all {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s in
the queue synchronously.
+ */
+ public void reportSynchronously() {
+ reportEventQueueSynchronously(this.reportingQueue);
+ }
+
/**
* Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue.
* @param queue {@link java.util.Queue} containing {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted.
*/
public abstract void reportEventQueue(Queue<GobblinTrackingEvent> queue);
+ /**
+ * Emit all {@link org.apache.gobblin.metrics.GobblinTrackingEvent} in queue
synchronously.
+ * Default implementation just calls {@link #reportEventQueue(Queue)} for
backward compatibility.
+ * @param queue
Review Comment:
Missing JavaDoc parameter description. The `@param queue` tag should include
a description of the queue parameter, similar to line 175.
```suggestion
* @param queue {@link java.util.Queue} containing {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}s that should be emitted
synchronously.
```
##########
gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/metrics/kafka/Pusher.java:
##########
@@ -30,4 +30,13 @@ public interface Pusher<M> extends Closeable {
* @param messages List of byte array messages to push to Kakfa.
Review Comment:
Typo in JavaDoc: "Kakfa" should be "Kafka".
```suggestion
* @param messages List of byte array messages to push to Kafka.
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java:
##########
@@ -19,22 +19,50 @@
import java.util.Map;
-import io.temporal.workflow.Workflow;
-
-import org.slf4j.Logger;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.cluster.ContainerMetrics;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.util.ConfigUtils;
+@Slf4j
public class SubmitGTEActivityImpl implements SubmitGTEActivity {
- private static Logger log =
Workflow.getLogger(SubmitGTEActivityImpl.class);
@Override
public void submitGTE(GobblinEventBuilder eventBuilder,
EventSubmitterContext eventSubmitterContext) {
- log.info("submitting GTE - {}",
summarizeEventMetadataForLogging(eventBuilder));
+ log.info("Submitting GTE - {}",
summarizeEventMetadataForLogging(eventBuilder));
eventSubmitterContext.create().submit(eventBuilder);
+ Config config = WorkerConfig.of(this).orElse(ConfigFactory.load());
+ if
(config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID)
&&
+
config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME))
{
+ String containerMetricsApplicationName =
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
+ String containerMetricsTaskRunnerId =
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID);
+ ContainerMetrics containerMetrics =
ContainerMetrics.get(ConfigUtils.configToState(config),
containerMetricsApplicationName, containerMetricsTaskRunnerId);
+ log.info("Fetched container metrics instance {} for taskRunnerId:
{} and applicationName: {}", containerMetrics.toString(),
+ containerMetricsTaskRunnerId, containerMetricsApplicationName);
+ Optional<KafkaAvroEventKeyValueReporter> kafkaReporterOptional =
containerMetrics.getScheduledReporter(KafkaAvroEventKeyValueReporter.class);
+ if (kafkaReporterOptional.isPresent()) {
+ log.info("Submitting GTE in synchronous manner to Kafka
reporter");
+ kafkaReporterOptional.get().reportSynchronously();
+ log.info("Submitted GTE to Kafka reporter");
+ } else {
+ log.error("No KafkaAvroEventKeyValueReporter found in
container metrics for taskRunnerId: {} and applicationName: {}",
Review Comment:
Logging at ERROR level may not be appropriate here. If the
KafkaAvroEventKeyValueReporter is optional, this should be logged at WARN or
INFO level. ERROR level should be reserved for actual failure conditions that
prevent the operation from completing successfully.
```suggestion
log.warn("No KafkaAvroEventKeyValueReporter found in
container metrics for taskRunnerId: {} and applicationName: {}",
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/SubmitGTEActivityImpl.java:
##########
@@ -19,22 +19,50 @@
import java.util.Map;
-import io.temporal.workflow.Workflow;
-
-import org.slf4j.Logger;
+import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.gobblin.cluster.ContainerMetrics;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.util.ConfigUtils;
+@Slf4j
public class SubmitGTEActivityImpl implements SubmitGTEActivity {
- private static Logger log =
Workflow.getLogger(SubmitGTEActivityImpl.class);
@Override
public void submitGTE(GobblinEventBuilder eventBuilder,
EventSubmitterContext eventSubmitterContext) {
- log.info("submitting GTE - {}",
summarizeEventMetadataForLogging(eventBuilder));
+ log.info("Submitting GTE - {}",
summarizeEventMetadataForLogging(eventBuilder));
eventSubmitterContext.create().submit(eventBuilder);
+ Config config = WorkerConfig.of(this).orElse(ConfigFactory.load());
+ if
(config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID)
&&
+
config.hasPath(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME))
{
+ String containerMetricsApplicationName =
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_APPLICATION_NAME);
+ String containerMetricsTaskRunnerId =
config.getString(GobblinTemporalConfigurationKeys.GOBBLIN_TEMPORAL_CONTAINER_METRICS_TASK_RUNNER_ID);
+ ContainerMetrics containerMetrics =
ContainerMetrics.get(ConfigUtils.configToState(config),
containerMetricsApplicationName, containerMetricsTaskRunnerId);
+ log.info("Fetched container metrics instance {} for taskRunnerId:
{} and applicationName: {}", containerMetrics.toString(),
+ containerMetricsTaskRunnerId, containerMetricsApplicationName);
+ Optional<KafkaAvroEventKeyValueReporter> kafkaReporterOptional =
containerMetrics.getScheduledReporter(KafkaAvroEventKeyValueReporter.class);
+ if (kafkaReporterOptional.isPresent()) {
+ log.info("Submitting GTE in synchronous manner to Kafka
reporter");
+ kafkaReporterOptional.get().reportSynchronously();
+ log.info("Submitted GTE to Kafka reporter");
+ } else {
+ log.error("No KafkaAvroEventKeyValueReporter found in
container metrics for taskRunnerId: {} and applicationName: {}",
+ containerMetricsTaskRunnerId,
containerMetricsApplicationName);
+ }
+ } else {
+ log.error("Both {} and {} must be set to fetch container metrics
instance for synchronous GTE submission",
Review Comment:
Logging at ERROR level may not be appropriate here. If these configuration
keys are optional (as suggested by the conditional check), then their absence
should be logged at WARN or INFO level instead. ERROR level typically indicates
a problem that prevents normal operation, but the code continues without these
values.
```suggestion
log.warn("Both {} and {} should be set to fetch container
metrics instance for synchronous GTE submission",
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]