klsince commented on a change in pull request #7299:
URL: https://github.com/apache/pinot/pull/7299#discussion_r694335606
##########
File path:
pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-ingestion-common/src/main/java/org/apache/pinot/ingestion/jobs/SegmentPreprocessingJob.java
##########
@@ -90,4 +119,131 @@ protected boolean isDataFile(String fileName) {
// TODO: support orc format in the future.
return fileName.endsWith(".avro");
}
+
+ protected void setTableConfigAndSchema()
+ throws IOException {
+ _tableConfig = getTableConfig();
+ _pinotTableSchema = getSchema();
+
+ Preconditions.checkState(_tableConfig != null, "Table config cannot be
null.");
+ Preconditions.checkState(_pinotTableSchema != null, "Schema cannot be
null");
+ }
+
+ protected void fetchPreProcessingOperations() {
+ _preprocessingOperations = new HashSet<>();
+ TableCustomConfig customConfig = _tableConfig.getCustomConfig();
+ if (customConfig != null) {
+ Map<String, String> customConfigMap = customConfig.getCustomConfigs();
+ if (customConfigMap != null && !customConfigMap.isEmpty()) {
+ String preprocessingOperationsString =
+
customConfigMap.getOrDefault(InternalConfigConstants.PREPROCESS_OPERATIONS, "");
+ DataPreprocessingUtils.getOperations(_preprocessingOperations,
preprocessingOperationsString);
+ }
+ }
+ }
+
+ protected void fetchPartitioningConfig() {
+ // Fetch partition info from table config.
+ if
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.PARTITION))
{
+ LOGGER.info("Partitioning is disabled.");
+ return;
+ }
+ SegmentPartitionConfig segmentPartitionConfig =
_tableConfig.getIndexingConfig().getSegmentPartitionConfig();
+ if (segmentPartitionConfig != null) {
+ Map<String, ColumnPartitionConfig> columnPartitionMap =
segmentPartitionConfig.getColumnPartitionMap();
+ Preconditions
+ .checkArgument(columnPartitionMap.size() <= 1, "There should be at
most 1 partition setting in the table.");
+ if (columnPartitionMap.size() == 1) {
+ _partitionColumn = columnPartitionMap.keySet().iterator().next();
+ _numPartitions =
segmentPartitionConfig.getNumPartitions(_partitionColumn);
+ _partitionFunction =
segmentPartitionConfig.getFunctionName(_partitionColumn);
+ }
+ } else {
+ LOGGER.info("Segment partition config is null for table: {}",
_tableConfig.getTableName());
+ }
+ }
+
+ protected void fetchSortingConfig() {
+ if
(!_preprocessingOperations.contains(DataPreprocessingUtils.Operation.SORT)) {
+ LOGGER.info("Sorting is disabled.");
+ return;
+ }
+ // Fetch sorting info from table config first.
+ List<String> sortingColumns = new ArrayList<>();
+ List<FieldConfig> fieldConfigs = _tableConfig.getFieldConfigList();
+ if (fieldConfigs != null && !fieldConfigs.isEmpty()) {
+ for (FieldConfig fieldConfig : fieldConfigs) {
+ if (fieldConfig.getIndexType() == FieldConfig.IndexType.SORTED) {
+ sortingColumns.add(fieldConfig.getName());
+ }
+ }
+ }
+ if (!sortingColumns.isEmpty()) {
+ Preconditions.checkArgument(sortingColumns.size() == 1, "There should be
at most 1 sorted column in the table.");
Review comment:
ack, a if-check would be needed for `.get(0)` to be safe. I was mainly
concerning about the error msg `at most 1` and the `== 1` check is a bit
inconsistent. I was suggesting sth like L191 and L192 below.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]