This is an automated email from the ASF dual-hosted git repository.

rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 70d306200 [STORM-3404] KafkaOffsetLagUtil cant pull the offset 
correctly (#3377)
70d306200 is described below

commit 70d30620092a98a50b123cbe70a0b0b6aef7c07e
Author: AngryCookie <[email protected]>
AuthorDate: Tue Oct 24 02:14:55 2023 +0800

    [STORM-3404] KafkaOffsetLagUtil cant pull the offset correctly (#3377)
    
    [STORM-3404]KafkaOffsetLagUtil cant pull the offset correctly
    
    ---------
    
    Co-authored-by: 王星轶 <[email protected]>
---
 .../org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java | 12 +++++++++++-
 .../storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java      | 14 +++++++++++++-
 2 files changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index 94b84f0cd..77d9f78e6 100644
--- 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++ 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -48,6 +48,8 @@ public class KafkaOffsetLagUtil {
     private static final String OPTION_GROUP_ID_LONG = "groupid";
     private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s";
     private static final String OPTION_SECURITY_PROTOCOL_LONG = 
"security-protocol";
+    private static final String OPTION_SASL_MECHANISM_SHORT = "m";
+    private static final String OPTION_SASL_MECHANISM_LONG = "sasl-mechanism";
     private static final String OPTION_CONSUMER_CONFIG_SHORT = "c";
     private static final String OPTION_CONSUMER_CONFIG_LONG = 
"consumer-config";
 
@@ -60,13 +62,14 @@ public class KafkaOffsetLagUtil {
                 printUsageAndExit(options, OPTION_TOPIC_LONG + " is required");
             }
             String securityProtocol = 
commandLine.getOptionValue(OPTION_SECURITY_PROTOCOL_LONG);
+            String saslMechanism = 
commandLine.getOptionValue(OPTION_SASL_MECHANISM_LONG);
             if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || 
!commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
                 printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " + 
OPTION_BOOTSTRAP_BROKERS_LONG + " are required");
             }
             NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery =
                 new 
NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
                     commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG),
-                    commandLine.getOptionValue(OPTION_GROUP_ID_LONG), 
securityProtocol,
+                    commandLine.getOptionValue(OPTION_GROUP_ID_LONG), 
securityProtocol, saslMechanism,
                     commandLine.getOptionValue(OPTION_CONSUMER_CONFIG_LONG));
             List<KafkaOffsetLagResult> results = 
getOffsetLags(newKafkaSpoutOffsetQuery);
 
@@ -119,6 +122,10 @@ public class KafkaOffsetLagUtil {
                 OPTION_SECURITY_PROTOCOL_LONG,
                 true,
                 "Security protocol to connect to kafka");
+        options.addOption(OPTION_SASL_MECHANISM_SHORT,
+                OPTION_SASL_MECHANISM_LONG,
+                true,
+                "Sasl mechanism to connect to kafka, default is GSSAPI");
         options.addOption(OPTION_CONSUMER_CONFIG_SHORT,
                 OPTION_CONSUMER_CONFIG_LONG,
                 true,
@@ -145,6 +152,9 @@ public class KafkaOffsetLagUtil {
             if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
                 props.put("security.protocol", 
newKafkaSpoutOffsetQuery.getSecurityProtocol());
             }
+            if (newKafkaSpoutOffsetQuery.getSaslMechanism() != null) {
+                props.put("sasl.mechanism", 
newKafkaSpoutOffsetQuery.getSaslMechanism());
+            }
             // Read property file for extra consumer properties
             if (newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName() != 
null) {
                 
props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName()));
diff --git 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
index 849b7130e..c771a00be 100644
--- 
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
+++ 
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
@@ -27,14 +27,16 @@ public class NewKafkaSpoutOffsetQuery {
     private final String consumerGroupId; // consumer group id for which the 
offset needs to be calculated
     private final String bootStrapBrokers; // bootstrap brokers
     private final String securityProtocol; // security protocol to connect to 
kafka
+    private final String saslMechanism; // Sasl mechanism to connect to kafka, 
default is GSSAPI
     private final String consumerPropertiesFileName; // properties file 
containing additional kafka consumer configs
 
     public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers, 
String consumerGroupId, String securityProtocol,
-        String consumerPropertiesFileName) {
+        String saslMechanism, String consumerPropertiesFileName) {
         this.topics = topics;
         this.bootStrapBrokers = bootstrapBrokers;
         this.consumerGroupId = consumerGroupId;
         this.securityProtocol = securityProtocol;
+        this.saslMechanism = saslMechanism;
         this.consumerPropertiesFileName = consumerPropertiesFileName;
     }
 
@@ -54,6 +56,10 @@ public class NewKafkaSpoutOffsetQuery {
         return this.securityProtocol;
     }
 
+    public String getSaslMechanism() {
+        return this.saslMechanism;
+    }
+
     public String getConsumerPropertiesFileName() {
         return this.consumerPropertiesFileName;
     }
@@ -65,6 +71,7 @@ public class NewKafkaSpoutOffsetQuery {
                 + ", consumerGroupId='" + consumerGroupId + '\''
                 + ", bootStrapBrokers='" + bootStrapBrokers + '\''
                 + ", securityProtocol='" + securityProtocol + '\''
+                + ", saslMechanism='" + saslMechanism + '\''
                 + ", consumerPropertiesFileName='" + 
consumerPropertiesFileName + '\''
                 + '}';
     }
@@ -95,6 +102,10 @@ public class NewKafkaSpoutOffsetQuery {
                                      : that.securityProtocol != null) {
             return false;
         }
+        if (saslMechanism != null ? !saslMechanism.equals(that.saslMechanism)
+                : that.saslMechanism != null) {
+            return false;
+        }
         return consumerPropertiesFileName != null ? consumerPropertiesFileName
             .equals(that.consumerPropertiesFileName) : 
that.consumerPropertiesFileName == null;
     }
@@ -105,6 +116,7 @@ public class NewKafkaSpoutOffsetQuery {
         result = 31 * result + (consumerGroupId != null ? 
consumerGroupId.hashCode() : 0);
         result = 31 * result + (bootStrapBrokers != null ? 
bootStrapBrokers.hashCode() : 0);
         result = 31 * result + (securityProtocol != null ? 
securityProtocol.hashCode() : 0);
+        result = 31 * result + (saslMechanism != null ? 
saslMechanism.hashCode() : 0);
         result = 31 * result + (consumerPropertiesFileName != null ? 
consumerPropertiesFileName.hashCode() : 0);
         return result;
     }

Reply via email to