This is an automated email from the ASF dual-hosted git repository. bharathkk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push: new 010d14879 Emit to diagnostic kafka topic in round robin fashion to support increasing this topic partition count (#1631) 010d14879 is described below commit 010d14879f401cc01f083cf84f32eb8741cc8398 Author: jia-gao <94939653+jia-...@users.noreply.github.com> AuthorDate: Tue Sep 27 18:44:07 2022 -0700 Emit to diagnostic kafka topic in round robin fashion to support increasing this topic partition count (#1631) Symptom: For some Samza jobs, the size of the diagnostic stream partition grows to be large and can cause issues in system streams like Kafka Cause: Currently, diagnostics topics for Samza jobs are created with a single partition. Because it is created in DiagnosticsUtil and uses the default partition count, which is 1. There could be a need to increase the partition count of the diagnostic stream to resolve the issue above. However, Samza currently uses hostnames as partition key to distribute diagnostic msgs. The host names can be very similar, the hash of these in the partition key will not evenly distribute the msgs across partitions. Change: Use the “null” partition key instead of using the hostname as the partition key, and leverage on Kafka default partitioner to evenly distributed the events across partitions. Note that this change is backward compatible with the default partition count (1) since every msg goes to the same partition regardless of the partition key. API changes: None Backward compatible: No. Note that the change will be backwards incompatible only when the diagnostic stream has more than one partition. The default creation for diagnostic stream inside samza is with 1 partition, but there are cases that the stream pre-exists or partition count increased after creation. Test Done: ./gradlew build --- .../src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java index f420a93c9..5715365fe 100644 --- a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java +++ b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java @@ -240,7 +240,7 @@ public class DiagnosticsManager { if (!diagnosticsStreamMessage.isEmpty()) { systemProducer.send(DiagnosticsManager.class.getName(), - new OutgoingMessageEnvelope(diagnosticSystemStream, hostname, null, + new OutgoingMessageEnvelope(diagnosticSystemStream, null, null, new MetricsSnapshotSerdeV2().toBytes(diagnosticsStreamMessage.convertToMetricsSnapshot()))); systemProducer.flush(DiagnosticsManager.class.getName());