npawar commented on a change in pull request #6718:
URL: https://github.com/apache/incubator-pinot/pull/6718#discussion_r603586428
##########
File path:
pinot-core/src/main/java/org/apache/pinot/core/util/IngestionUtils.java
##########
@@ -18,25 +18,208 @@
*/
package org.apache.pinot.core.util;
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.core.data.function.FunctionEvaluator;
import org.apache.pinot.core.data.function.FunctionEvaluatorFactory;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.FixedSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.ingestion.batch.BatchConfig;
+import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
+import org.apache.pinot.spi.utils.retry.RetriableOperationException;
+import org.apache.pinot.spi.utils.retry.RetryPolicies;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Utility methods for extracting source and destination fields from ingestion
configs
+ * Helper methods for ingestion
*/
-public class IngestionUtils {
+public final class IngestionUtils {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(IngestionUtils.class);
+
+ private static final String DEFAULT_SEGMENT_NAME_GENERATOR_TYPE =
+ BatchConfigProperties.SegmentNameGeneratorType.SIMPLE;
+ private static final long DEFAULT_RETRY_WAIT_MS = 1000L;
+ private static final int DEFAULT_ATTEMPTS = 3;
+ private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT =
new FileUploadDownloadClient();
+
+ private IngestionUtils() {
+ }
+
+ /**
+ * Create {@link SegmentGeneratorConfig} using tableConfig and schema.
+ * All properties are taken from the 1st Map in tableConfig ->
ingestionConfig -> batchIngestionConfig -> batchConfigMaps
+ * @param tableConfig tableConfig with the batchConfigMap set
+ * @param schema pinot schema
+ */
+ public static SegmentGeneratorConfig
generateSegmentGeneratorConfig(TableConfig tableConfig, Schema schema)
+ throws IOException, ClassNotFoundException {
+ Preconditions.checkNotNull(tableConfig.getIngestionConfig(),
+ "Must provide batchIngestionConfig in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+ return generateSegmentGeneratorConfig(tableConfig, schema,
+ tableConfig.getIngestionConfig().getBatchIngestionConfig());
+ }
+
+ /**
+ * Create {@link SegmentGeneratorConfig} using tableConfig, schema and
batchIngestionConfig.
+ * The provided batchIngestionConfig will take precedence over the one in
tableConfig
+ */
+ public static SegmentGeneratorConfig
generateSegmentGeneratorConfig(TableConfig tableConfig, Schema schema,
+ BatchIngestionConfig batchIngestionConfig)
+ throws ClassNotFoundException, IOException {
+ Preconditions.checkNotNull(batchIngestionConfig,
+ "Must provide batchIngestionConfig in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+
Preconditions.checkState(CollectionUtils.isNotEmpty(batchIngestionConfig.getBatchConfigMaps()),
+ "Must provide batchConfigMap in tableConfig for table: %s, for
generating SegmentGeneratorConfig",
+ tableConfig.getTableName());
+ BatchConfig batchConfig =
+ new BatchConfig(tableConfig.getTableName(),
batchIngestionConfig.getBatchConfigMaps().get(0));
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
+
+ // Input/output configs
+ segmentGeneratorConfig.setInputFilePath(batchConfig.getInputDirURI());
+ segmentGeneratorConfig.setOutDir(batchConfig.getOutputDirURI());
+
+ // Reader configs
+ segmentGeneratorConfig
+
.setRecordReaderPath(RecordReaderFactory.getRecordReaderClassName(batchConfig.getInputFormat().toString()));
+ Map<String, String> recordReaderProps = batchConfig.getRecordReaderProps();
+
segmentGeneratorConfig.setReaderConfig(RecordReaderFactory.getRecordReaderConfig(batchConfig.getInputFormat(),
+ IngestionConfigUtils.getRecordReaderProps(recordReaderProps)));
+
+ // Segment name generator configs
+ SegmentNameGenerator segmentNameGenerator =
+ getSegmentNameGenerator(batchConfig,
batchIngestionConfig.getSegmentIngestionType(),
+ batchIngestionConfig.getSegmentIngestionFrequency(), tableConfig,
schema);
+ segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator);
+ String sequenceId = batchConfig.getSequenceId();
+ if (StringUtils.isNumeric(sequenceId)) {
+ segmentGeneratorConfig.setSequenceId(Integer.parseInt(sequenceId));
+ }
+
+ return segmentGeneratorConfig;
+ }
+
+ private static SegmentNameGenerator getSegmentNameGenerator(BatchConfig
batchConfig, String pushType,
+ String pushFrequency, TableConfig tableConfig, Schema schema) {
+
+ String rawTableName =
TableNameBuilder.extractRawTableName(batchConfig.getTableNameWithType());
+ String segmentNameGeneratorType =
batchConfig.getSegmentNameGeneratorType();
+ if (segmentNameGeneratorType == null) {
+ segmentNameGeneratorType = DEFAULT_SEGMENT_NAME_GENERATOR_TYPE;
+ }
+ switch (segmentNameGeneratorType) {
+ case BatchConfigProperties.SegmentNameGeneratorType.FIXED:
+ return new FixedSegmentNameGenerator(batchConfig.getSegmentName());
+
+ case BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE:
+ DateTimeFormatSpec dateTimeFormatSpec = null;
+ String timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
+ if (timeColumnName != null) {
Review comment:
that already gets handled. The normalized segment name generator has
annotated DateTimeFormatSpec as Nullable, and handles it internally.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]