This is an automated email from the ASF dual-hosted git repository.
rzo1 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 70d306200 [STORM-3404] KafkaOffsetLagUtil cant pull the offset
correctly (#3377)
70d306200 is described below
commit 70d30620092a98a50b123cbe70a0b0b6aef7c07e
Author: AngryCookie <[email protected]>
AuthorDate: Tue Oct 24 02:14:55 2023 +0800
[STORM-3404] KafkaOffsetLagUtil cant pull the offset correctly (#3377)
[STORM-3404]KafkaOffsetLagUtil cant pull the offset correctly
---------
Co-authored-by: 王星轶 <[email protected]>
---
.../org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java | 12 +++++++++++-
.../storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java | 14 +++++++++++++-
2 files changed, 24 insertions(+), 2 deletions(-)
diff --git
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
index 94b84f0cd..77d9f78e6 100644
---
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
+++
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/KafkaOffsetLagUtil.java
@@ -48,6 +48,8 @@ public class KafkaOffsetLagUtil {
private static final String OPTION_GROUP_ID_LONG = "groupid";
private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s";
private static final String OPTION_SECURITY_PROTOCOL_LONG =
"security-protocol";
+ private static final String OPTION_SASL_MECHANISM_SHORT = "m";
+ private static final String OPTION_SASL_MECHANISM_LONG = "sasl-mechanism";
private static final String OPTION_CONSUMER_CONFIG_SHORT = "c";
private static final String OPTION_CONSUMER_CONFIG_LONG =
"consumer-config";
@@ -60,13 +62,14 @@ public class KafkaOffsetLagUtil {
printUsageAndExit(options, OPTION_TOPIC_LONG + " is required");
}
String securityProtocol =
commandLine.getOptionValue(OPTION_SECURITY_PROTOCOL_LONG);
+ String saslMechanism =
commandLine.getOptionValue(OPTION_SASL_MECHANISM_LONG);
if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) ||
!commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " +
OPTION_BOOTSTRAP_BROKERS_LONG + " are required");
}
NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery =
new
NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG),
- commandLine.getOptionValue(OPTION_GROUP_ID_LONG),
securityProtocol,
+ commandLine.getOptionValue(OPTION_GROUP_ID_LONG),
securityProtocol, saslMechanism,
commandLine.getOptionValue(OPTION_CONSUMER_CONFIG_LONG));
List<KafkaOffsetLagResult> results =
getOffsetLags(newKafkaSpoutOffsetQuery);
@@ -119,6 +122,10 @@ public class KafkaOffsetLagUtil {
OPTION_SECURITY_PROTOCOL_LONG,
true,
"Security protocol to connect to kafka");
+ options.addOption(OPTION_SASL_MECHANISM_SHORT,
+ OPTION_SASL_MECHANISM_LONG,
+ true,
+ "Sasl mechanism to connect to kafka, default is GSSAPI");
options.addOption(OPTION_CONSUMER_CONFIG_SHORT,
OPTION_CONSUMER_CONFIG_LONG,
true,
@@ -145,6 +152,9 @@ public class KafkaOffsetLagUtil {
if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
props.put("security.protocol",
newKafkaSpoutOffsetQuery.getSecurityProtocol());
}
+ if (newKafkaSpoutOffsetQuery.getSaslMechanism() != null) {
+ props.put("sasl.mechanism",
newKafkaSpoutOffsetQuery.getSaslMechanism());
+ }
// Read property file for extra consumer properties
if (newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName() !=
null) {
props.putAll(Utils.loadProps(newKafkaSpoutOffsetQuery.getConsumerPropertiesFileName()));
diff --git
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
index 849b7130e..c771a00be 100644
---
a/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
+++
b/external/storm-kafka-monitor/src/main/java/org/apache/storm/kafka/monitor/NewKafkaSpoutOffsetQuery.java
@@ -27,14 +27,16 @@ public class NewKafkaSpoutOffsetQuery {
private final String consumerGroupId; // consumer group id for which the
offset needs to be calculated
private final String bootStrapBrokers; // bootstrap brokers
private final String securityProtocol; // security protocol to connect to
kafka
+ private final String saslMechanism; // Sasl mechanism to connect to kafka,
default is GSSAPI
private final String consumerPropertiesFileName; // properties file
containing additional kafka consumer configs
public NewKafkaSpoutOffsetQuery(String topics, String bootstrapBrokers,
String consumerGroupId, String securityProtocol,
- String consumerPropertiesFileName) {
+ String saslMechanism, String consumerPropertiesFileName) {
this.topics = topics;
this.bootStrapBrokers = bootstrapBrokers;
this.consumerGroupId = consumerGroupId;
this.securityProtocol = securityProtocol;
+ this.saslMechanism = saslMechanism;
this.consumerPropertiesFileName = consumerPropertiesFileName;
}
@@ -54,6 +56,10 @@ public class NewKafkaSpoutOffsetQuery {
return this.securityProtocol;
}
+ public String getSaslMechanism() {
+ return this.saslMechanism;
+ }
+
public String getConsumerPropertiesFileName() {
return this.consumerPropertiesFileName;
}
@@ -65,6 +71,7 @@ public class NewKafkaSpoutOffsetQuery {
+ ", consumerGroupId='" + consumerGroupId + '\''
+ ", bootStrapBrokers='" + bootStrapBrokers + '\''
+ ", securityProtocol='" + securityProtocol + '\''
+ + ", saslMechanism='" + saslMechanism + '\''
+ ", consumerPropertiesFileName='" +
consumerPropertiesFileName + '\''
+ '}';
}
@@ -95,6 +102,10 @@ public class NewKafkaSpoutOffsetQuery {
: that.securityProtocol != null) {
return false;
}
+ if (saslMechanism != null ? !saslMechanism.equals(that.saslMechanism)
+ : that.saslMechanism != null) {
+ return false;
+ }
return consumerPropertiesFileName != null ? consumerPropertiesFileName
.equals(that.consumerPropertiesFileName) :
that.consumerPropertiesFileName == null;
}
@@ -105,6 +116,7 @@ public class NewKafkaSpoutOffsetQuery {
result = 31 * result + (consumerGroupId != null ?
consumerGroupId.hashCode() : 0);
result = 31 * result + (bootStrapBrokers != null ?
bootStrapBrokers.hashCode() : 0);
result = 31 * result + (securityProtocol != null ?
securityProtocol.hashCode() : 0);
+ result = 31 * result + (saslMechanism != null ?
saslMechanism.hashCode() : 0);
result = 31 * result + (consumerPropertiesFileName != null ?
consumerPropertiesFileName.hashCode() : 0);
return result;
}