This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f4129876f1 Add config to preserve stream topic name as a column for
`CLPLogRecordExtractor` (#14599)
f4129876f1 is described below
commit f4129876f17a6dffba86b9a1e41972ac0e8e1cd2
Author: Christopher Peck <[email protected]>
AuthorDate: Fri Dec 13 17:25:18 2024 -0800
Add config to preserve stream topic name as a column for
`CLPLogRecordExtractor` (#14599)
* Add config to preserve stream topic name in rows
* fill row if extract all
---
.../inputformat/clplog/CLPLogRecordExtractor.java | 10 ++++++++++
.../clplog/CLPLogRecordExtractorConfig.java | 15 +++++++++++++++
.../clplog/CLPLogRecordExtractorTest.java | 21 ++++++++++++++++++++-
3 files changed, 45 insertions(+), 1 deletion(-)
diff --git
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
index 2f2e730b12..d4be2e193f 100644
---
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
+++
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractor.java
@@ -97,6 +97,11 @@ public class CLPLogRecordExtractor extends
BaseRecordExtractor<Map<String, Objec
_serverMetrics = serverMetrics;
}
+ public void init(Set<String> fields, @Nullable RecordExtractorConfig
recordExtractorConfig, String topicName) {
+ init(fields, recordExtractorConfig);
+ _topicName = topicName;
+ }
+
@Override
public void init(Set<String> fields, @Nullable RecordExtractorConfig
recordExtractorConfig) {
_config = (CLPLogRecordExtractorConfig) recordExtractorConfig;
@@ -131,6 +136,11 @@ public class CLPLogRecordExtractor extends
BaseRecordExtractor<Map<String, Objec
public GenericRow extract(Map<String, Object> from, GenericRow to) {
Set<String> clpEncodedFieldNames = _config.getFieldsForClpEncoding();
+ // Preserve topic name if configured, regardless of _extractAll
+ if (_config.getTopicNameDestinationColumn() != null) {
+ to.putValue(_config.getTopicNameDestinationColumn(), _topicName);
+ }
+
if (_extractAll) {
for (Map.Entry<String, Object> recordEntry : from.entrySet()) {
String recordKey = recordEntry.getKey();
diff --git
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java
index 3d08f42da5..c735a04f06 100644
---
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java
+++
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/main/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorConfig.java
@@ -49,6 +49,8 @@ public class CLPLogRecordExtractorConfig implements
RecordExtractorConfig {
public static final String REMOVE_PROCESSED_FIELDS_CONFIG_KEY =
"removeProcessedFields";
public static final String UNENCODABLE_FIELD_SUFFIX_CONFIG_KEY =
"unencodableFieldSuffix";
public static final String UNENCODABLE_FIELD_ERROR_CONFIG_KEY =
"unencodableFieldError";
+ // Preserve the topic name as a column in each destination row. If null, the
topic name will not be preserved.
+ public static final String TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY =
"topicNameDestinationColumn";
private static final Logger LOGGER =
LoggerFactory.getLogger(CLPLogRecordExtractorConfig.class);
@@ -56,6 +58,7 @@ public class CLPLogRecordExtractorConfig implements
RecordExtractorConfig {
private String _unencodableFieldSuffix = null;
private String _unencodableFieldError = null;
private boolean _removeProcessedFields = false;
+ private String _topicNameDestinationColumn = null;
@Override
public void init(Map<String, String> props) {
@@ -64,6 +67,15 @@ public class CLPLogRecordExtractorConfig implements
RecordExtractorConfig {
return;
}
+ String topicNameDestinationColumn =
props.get(TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY);
+ if (null != topicNameDestinationColumn) {
+ if (topicNameDestinationColumn.length() == 0) {
+ LOGGER.warn("Ignoring empty value for {}",
TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY);
+ } else {
+ _topicNameDestinationColumn = topicNameDestinationColumn;
+ }
+ }
+
String concatenatedFieldNames =
props.get(FIELDS_FOR_CLP_ENCODING_CONFIG_KEY);
if (null == concatenatedFieldNames) {
return;
@@ -114,4 +126,7 @@ public class CLPLogRecordExtractorConfig implements
RecordExtractorConfig {
public String getUnencodableFieldError() {
return _unencodableFieldError;
}
+ public String getTopicNameDestinationColumn() {
+ return _topicNameDestinationColumn;
+ }
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
index 2e002cdb0c..1aeb5a3fd9 100644
---
a/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-clp-log/src/test/java/org/apache/pinot/plugin/inputformat/clplog/CLPLogRecordExtractorTest.java
@@ -52,6 +52,8 @@ public class CLPLogRecordExtractorTest {
private static final String _MESSAGE_2_FIELD_NAME = "message2";
private static final String _MESSAGE_2_FIELD_VALUE = "Stopped job_123 on
node-987: 3 cores, 6 threads and "
+ "22.0% memory used.";
+ private static final String _TOPIC_NAME = "test-topic";
+ private static final String _TOPIC_NAME_DEST_COLUMN = "_streamTopicName";
@Test
public void testCLPEncoding() {
@@ -147,6 +149,23 @@ public class CLPLogRecordExtractorTest {
assertEquals(row.getValue(_MESSAGE_2_FIELD_NAME), _MESSAGE_2_FIELD_VALUE);
}
+ @Test
+ public void testPreserveTopicName() {
+ Map<String, String> props = new HashMap<>();
+ Set<String> fieldsToRead = new HashSet<>();
+ fieldsToRead.add(_MESSAGE_1_FIELD_NAME);
+
+ // Test null topicNameDestinationColumn config
+ GenericRow row;
+ row = extract(props, fieldsToRead);
+ assertNull(row.getValue(_TOPIC_NAME_DEST_COLUMN));
+
+ // Test with valid topicNameDestinationColumn config
+
props.put(CLPLogRecordExtractorConfig.TOPIC_NAME_DESTINATION_COLUMN_CONFIG_KEY,
_TOPIC_NAME_DEST_COLUMN);
+ row = extract(props, fieldsToRead);
+ assertEquals(row.getValue(_TOPIC_NAME_DEST_COLUMN), _TOPIC_NAME);
+ }
+
private void addCLPEncodedField(String fieldName, Set<String> fields) {
fields.add(fieldName + ClpRewriter.LOGTYPE_COLUMN_SUFFIX);
fields.add(fieldName + ClpRewriter.DICTIONARY_VARS_COLUMN_SUFFIX);
@@ -157,7 +176,7 @@ public class CLPLogRecordExtractorTest {
CLPLogRecordExtractorConfig extractorConfig = new
CLPLogRecordExtractorConfig();
CLPLogRecordExtractor extractor = new CLPLogRecordExtractor();
extractorConfig.init(props);
- extractor.init(fieldsToRead, extractorConfig);
+ extractor.init(fieldsToRead, extractorConfig, _TOPIC_NAME);
// Assemble record
Map<String, Object> record = new HashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]