This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 010d14879 Emit to diagnostic kafka topic in round robin fashion to 
support increasing this topic partition count (#1631)
010d14879 is described below

commit 010d14879f401cc01f083cf84f32eb8741cc8398
Author: jia-gao <94939653+jia-...@users.noreply.github.com>
AuthorDate: Tue Sep 27 18:44:07 2022 -0700

    Emit to diagnostic kafka topic in round robin fashion to support increasing 
this topic partition count (#1631)
    
    Symptom: For some Samza jobs, the size of the diagnostic stream partition 
grows to be large and can cause issues in system streams like Kafka
    
    Cause:
    Currently, diagnostics topics for Samza jobs are created with a single 
partition.
    Because it is created in DiagnosticsUtil and uses the default partition 
count, which is 1.
    There could be a need to increase the partition count of the diagnostic 
stream to resolve the issue above.
    However, Samza currently uses hostnames as partition key to distribute 
diagnostic msgs. The host names can be very similar, the hash of these in the 
partition key will not evenly distribute the msgs across partitions.
    
    Change:
    Use the “null” partition key instead of using the hostname as the partition 
key, and leverage on Kafka default partitioner to evenly distributed the events 
across partitions.
    Note that this change is backward compatible with the default partition 
count (1) since every msg goes to the same partition regardless of the 
partition key.
    
    API changes: None
    
    Backward compatible: No. Note that the change will be backwards 
incompatible only when the diagnostic stream has more than one partition. The 
default creation for diagnostic stream inside samza is with 1 partition, but 
there are cases that the stream pre-exists or partition count increased after 
creation.
    
    Test Done:
    ./gradlew build
---
 .../src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
index f420a93c9..5715365fe 100644
--- 
a/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
+++ 
b/samza-core/src/main/scala/org/apache/samza/diagnostics/DiagnosticsManager.java
@@ -240,7 +240,7 @@ public class DiagnosticsManager {
         if (!diagnosticsStreamMessage.isEmpty()) {
 
           systemProducer.send(DiagnosticsManager.class.getName(),
-              new OutgoingMessageEnvelope(diagnosticSystemStream, hostname, 
null,
+              new OutgoingMessageEnvelope(diagnosticSystemStream, null, null,
                   new 
MetricsSnapshotSerdeV2().toBytes(diagnosticsStreamMessage.convertToMetricsSnapshot())));
           systemProducer.flush(DiagnosticsManager.class.getName());
 

Reply via email to