>From Ayush Tripathi <ayush.tripa...@couchbase.com>: Ayush Tripathi has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19140 )
Change subject: [ASTERIXDB-3503][EXT] Introduce Delta Lake Support for Google Cloud Storage (GCS) ...................................................................... [ASTERIXDB-3503][EXT] Introduce Delta Lake Support for Google Cloud Storage (GCS) - user model changes: no - storage format changes: no - interface changes: yes Ext-ref: MB-64376 Change-Id: I4cd44ba31a22cc124e346b077a1c2798ba9ab747 --- M asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java M asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java A asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java 8 files changed, 400 insertions(+), 237 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/40/19140/1 diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java index 8dc820b..0902441 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java @@ -18,208 +18,45 @@ */ package org.apache.asterix.external.input.record.reader.aws.delta; -import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME; -import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; +import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; -import java.util.Set; -import org.apache.asterix.common.cluster.IClusterStateManager; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; -import org.apache.asterix.external.api.IExternalDataRuntimeContext; -import org.apache.asterix.external.api.IRecordReader; -import org.apache.asterix.external.api.IRecordReaderFactory; -import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext; import org.apache.asterix.external.util.ExternalDataConstants; -import org.apache.asterix.external.util.HDFSUtils; import org.apache.asterix.external.util.aws.s3.S3Constants; -import org.apache.asterix.om.types.ARecordType; -import org.apache.asterix.runtime.projection.FunctionCallInformation; -import org.apache.commons.lang3.ArrayUtils; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; -import org.apache.hyracks.api.exceptions.Warning; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import org.apache.hyracks.hdfs.dataflow.ConfFactory; -import io.delta.kernel.Scan; -import io.delta.kernel.Snapshot; -import io.delta.kernel.data.FilteredColumnarBatch; -import io.delta.kernel.data.Row; -import io.delta.kernel.defaults.engine.DefaultEngine; -import io.delta.kernel.engine.Engine; -import io.delta.kernel.exceptions.KernelException; -import io.delta.kernel.internal.InternalScanFileUtils; -import io.delta.kernel.types.StructType; -import io.delta.kernel.utils.CloseableIterator; -import io.delta.kernel.utils.FileStatus; - -public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { - +public class AwsS3DeltaReaderFactory extends DeltaReaderFactory { private static final long serialVersionUID = 1L; - private static final List<String> recordReaderNames = + private static List<String> recordReaderNames = Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3); - private static final Logger LOGGER = LogManager.getLogger(); - private transient AlgebricksAbsolutePartitionConstraint locationConstraints; - private String scanState; - private Map<String, String> configuration; - protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>(); - - @Override - public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { - return locationConstraints; - } @Override public void configure(IServiceContext serviceCtx, Map<String, String> configuration, IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws AlgebricksException, HyracksDataException { - this.configuration = configuration; - Configuration conf = new Configuration(); - configurationBuilder(configuration, conf); - String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://" + JobConf conf = new JobConf(); + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + AlgebricksAbsolutePartitionConstraint clusterLocations = + appCtx.getDataPartitioningProvider().getClusterLocations(); + int numPartitions = clusterLocations.getLocations().length; + configureAwsS3HdfsJobConf(conf, configuration, numPartitions); + ConfFactory config = new ConfFactory(conf); + setConfFactory(config); + tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://" + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); - - ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); - - Engine engine = DefaultEngine.create(conf); - io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); - Snapshot snapshot; - try { - snapshot = table.getLatestSnapshot(engine); - } catch (KernelException e) { - LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e); - throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); - } - - List<Warning> warnings = new ArrayList<>(); - DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings); - AsterixTypeToDeltaTypeVisitor visitor = new AsterixTypeToDeltaTypeVisitor(converterContext); - StructType requiredSchema; - try { - ARecordType expectedType = HDFSUtils.getExpectedType(conf); - Map<String, FunctionCallInformation> functionCallInformationMap = - HDFSUtils.getFunctionCallInformationMap(conf); - StructType fileSchema = snapshot.getSchema(engine); - requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap); - } catch (IOException e) { - throw new RuntimeException(e); - } catch (AsterixDeltaRuntimeException e) { - throw e.getHyracksDataException(); - } - Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); - scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); - CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine); - - List<Row> scanFiles = new ArrayList<>(); - while (iter.hasNext()) { - FilteredColumnarBatch batch = iter.next(); - CloseableIterator<Row> rowIter = batch.getRows(); - while (rowIter.hasNext()) { - Row row = rowIter.next(); - scanFiles.add(row); - } - } - locationConstraints = configureLocationConstraints(appCtx, scanFiles); - configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); - distributeFiles(scanFiles); - issueWarnings(warnings, warningCollector); - } - - private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) { - if (!warnings.isEmpty()) { - for (Warning warning : warnings) { - if (warningCollector.shouldWarn()) { - warningCollector.warn(warning); - } - } - } - warnings.clear(); - } - - private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx, - List<Row> scanFiles) { - IClusterStateManager csm = appCtx.getClusterStateManager(); - - String[] locations = csm.getClusterLocations().getLocations(); - if (scanFiles.size() == 0) { - return AlgebricksAbsolutePartitionConstraint.randomLocation(locations); - } else if (locations.length > scanFiles.size()) { - LOGGER.debug( - "analytics partitions ({}) exceeds total partition count ({}); limiting ingestion partitions to total partition count", - locations.length, scanFiles.size()); - final String[] locationCopy = locations.clone(); - ArrayUtils.shuffle(locationCopy); - locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size()); - } - return new AlgebricksAbsolutePartitionConstraint(locations); - } - - private void distributeFiles(List<Row> scanFiles) { - final int partitionsCount = getPartitionConstraint().getLocations().length; - PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount, - Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize)); - - // Prepare the workloads based on the number of partitions - for (int i = 0; i < partitionsCount; i++) { - workloadQueue.add(new PartitionWorkLoadBasedOnSize()); - } - for (Row scanFileRow : scanFiles) { - PartitionWorkLoadBasedOnSize workload = workloadQueue.poll(); - FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); - workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), fileStatus.getSize()); - workloadQueue.add(workload); - } - partitionWorkLoadsBasedOnSize.addAll(workloadQueue); - } - - public static void configurationBuilder(Map<String, String> configuration, Configuration conf) { - conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME)); - conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME)); - if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) { - conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME)); - } - conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME)); - String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); - if (serviceEndpoint != null) { - conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint); - } - conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS, - configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, "")); - conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, - configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, "")); - } - - @Override - public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException { - try { - int partition = context.getPartition(); - return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState, - configuration, context); - } catch (Exception e) { - throw HyracksDataException.create(e); - } - } - - @Override - public Class<?> getRecordClass() throws AsterixException { - return Row.class; + super.configure(serviceCtx, configuration, warningCollector, filterEvaluatorFactory); } @Override @@ -227,36 +64,4 @@ return recordReaderNames; } - @Override - public Set<String> getReaderSupportedFormats() { - return Collections.singleton(ExternalDataConstants.FORMAT_DELTA); - } - - public static class PartitionWorkLoadBasedOnSize implements Serializable { - private static final long serialVersionUID = 1L; - private final List<String> scanFiles = new ArrayList<>(); - private long totalSize = 0; - - public PartitionWorkLoadBasedOnSize() { - } - - public List<String> getScanFiles() { - return scanFiles; - } - - public void addScanFile(String scanFile, long size) { - this.scanFiles.add(scanFile); - this.totalSize += size; - } - - public long getTotalSize() { - return totalSize; - } - - @Override - public String toString() { - return "Files: " + scanFiles.size() + ", Total Size: " + totalSize; - } - } - -} +} \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java index a5b21b6..87e8d85 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java @@ -19,12 +19,10 @@ package org.apache.asterix.external.input.record.reader.aws.delta; import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator; -import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.Optional; import org.apache.asterix.external.api.IExternalDataRuntimeContext; @@ -33,10 +31,10 @@ import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController; import org.apache.asterix.external.input.record.GenericRecord; import org.apache.asterix.external.util.IFeedLogManager; -import org.apache.asterix.external.util.aws.s3.S3Constants; -import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.hdfs.dataflow.ConfFactory; import io.delta.kernel.Scan; import io.delta.kernel.data.ColumnarBatch; @@ -69,20 +67,10 @@ private Row scanFile; private CloseableIterator<Row> rows; - public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, Map<String, String> conf, - IExternalDataRuntimeContext context) { - Configuration config = new Configuration(); - config.set(S3Constants.HADOOP_ACCESS_KEY_ID, conf.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME)); - config.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, conf.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME)); - if (conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) { - config.set(S3Constants.HADOOP_SESSION_TOKEN, conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME)); - } - config.set(S3Constants.HADOOP_REGION, conf.get(S3Constants.REGION_FIELD_NAME)); - String serviceEndpoint = conf.get(SERVICE_END_POINT_FIELD_NAME); - if (serviceEndpoint != null) { - config.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint); - } - this.engine = DefaultEngine.create(config); + public DeltaFileRecordReader(List<String> serScanFiles, String serScanState, ConfFactory config, + IExternalDataRuntimeContext context) throws HyracksDataException { + JobConf conf = config.getConf(); + this.engine = DefaultEngine.create(conf); this.scanFiles = new ArrayList<>(); for (String scanFile : serScanFiles) { this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile)); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java new file mode 100644 index 0000000..7ad1b84 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java @@ -0,0 +1,267 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME; +import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; + +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; + +import org.apache.asterix.common.cluster.IClusterStateManager; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.exceptions.ErrorCode; +import org.apache.asterix.common.exceptions.RuntimeDataException; +import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; +import org.apache.asterix.external.api.IExternalDataRuntimeContext; +import org.apache.asterix.external.api.IRecordReader; +import org.apache.asterix.external.api.IRecordReaderFactory; +import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.HDFSUtils; +import org.apache.asterix.external.util.aws.s3.S3Constants; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.runtime.projection.FunctionCallInformation; +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.Warning; +import org.apache.hyracks.hdfs.dataflow.ConfFactory; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import io.delta.kernel.Scan; +import io.delta.kernel.Snapshot; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.engine.DefaultEngine; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.exceptions.KernelException; +import io.delta.kernel.internal.InternalScanFileUtils; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; + +public class DeltaReaderFactory implements IRecordReaderFactory<Object> { + + private static final long serialVersionUID = 1L; + private static List<String> recordReaderNames; + private static final Logger LOGGER = LogManager.getLogger(); + private transient AlgebricksAbsolutePartitionConstraint locationConstraints; + private String scanState; + private Map<String, String> configuration; + protected final List<PartitionWorkLoadBasedOnSize> partitionWorkLoadsBasedOnSize = new ArrayList<>(); + protected ConfFactory confFactory; + protected String tableMetadataPath; + + @Override + public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() { + return locationConstraints; + } + + public void setRecordReaderNames(List<String> readerNames) { + recordReaderNames = readerNames; + } + + public void setConfFactory(ConfFactory config) { + this.confFactory = config; + } + + @Override + public void configure(IServiceContext serviceCtx, Map<String, String> configuration, + IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) + throws AlgebricksException, HyracksDataException { + this.configuration = configuration; + Configuration conf = confFactory.getConf(); + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + + Engine engine = DefaultEngine.create(conf); + io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); + Snapshot snapshot; + try { + snapshot = table.getLatestSnapshot(engine); + } catch (KernelException e) { + LOGGER.info("Failed to get latest snapshot for table: {}", tableMetadataPath, e); + throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, e, getMessageOrToString(e)); + } + + List<Warning> warnings = new ArrayList<>(); + DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings); + AsterixTypeToDeltaTypeVisitor visitor = new AsterixTypeToDeltaTypeVisitor(converterContext); + StructType requiredSchema; + try { + ARecordType expectedType = HDFSUtils.getExpectedType(conf); + Map<String, FunctionCallInformation> functionCallInformationMap = + HDFSUtils.getFunctionCallInformationMap(conf); + StructType fileSchema = snapshot.getSchema(engine); + requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap); + } catch (IOException e) { + throw new RuntimeException(e); + } catch (AsterixDeltaRuntimeException e) { + throw e.getHyracksDataException(); + } + Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); + scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); + CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine); + + List<Row> scanFiles = new ArrayList<>(); + while (iter.hasNext()) { + FilteredColumnarBatch batch = iter.next(); + CloseableIterator<Row> rowIter = batch.getRows(); + while (rowIter.hasNext()) { + Row row = rowIter.next(); + scanFiles.add(row); + } + } + locationConstraints = configureLocationConstraints(appCtx, scanFiles); + configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); + distributeFiles(scanFiles); + issueWarnings(warnings, warningCollector); + } + + private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) { + if (!warnings.isEmpty()) { + for (Warning warning : warnings) { + if (warningCollector.shouldWarn()) { + warningCollector.warn(warning); + } + } + } + warnings.clear(); + } + + private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx, + List<Row> scanFiles) { + IClusterStateManager csm = appCtx.getClusterStateManager(); + + String[] locations = csm.getClusterLocations().getLocations(); + if (scanFiles.size() == 0) { + return AlgebricksAbsolutePartitionConstraint.randomLocation(locations); + } else if (locations.length > scanFiles.size()) { + LOGGER.debug( + "analytics partitions ({}) exceeds total partition count ({}); limiting ingestion partitions to total partition count", + locations.length, scanFiles.size()); + final String[] locationCopy = locations.clone(); + ArrayUtils.shuffle(locationCopy); + locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size()); + } + return new AlgebricksAbsolutePartitionConstraint(locations); + } + + private void distributeFiles(List<Row> scanFiles) { + final int partitionsCount = getPartitionConstraint().getLocations().length; + PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new PriorityQueue<>(partitionsCount, + Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize)); + + // Prepare the workloads based on the number of partitions + for (int i = 0; i < partitionsCount; i++) { + workloadQueue.add(new PartitionWorkLoadBasedOnSize()); + } + for (Row scanFileRow : scanFiles) { + PartitionWorkLoadBasedOnSize workload = workloadQueue.poll(); + FileStatus fileStatus = InternalScanFileUtils.getAddFileStatus(scanFileRow); + workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), fileStatus.getSize()); + workloadQueue.add(workload); + } + partitionWorkLoadsBasedOnSize.addAll(workloadQueue); + } + + public static void configurationBuilder(Map<String, String> configuration, Configuration conf) { + conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME)); + conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME)); + if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) { + conf.set(S3Constants.HADOOP_SESSION_TOKEN, configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME)); + } + conf.set(S3Constants.HADOOP_REGION, configuration.get(S3Constants.REGION_FIELD_NAME)); + String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME); + if (serviceEndpoint != null) { + conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint); + } + conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS, + configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, "")); + conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, + configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, "")); + } + + @Override + public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext context) throws HyracksDataException { + try { + int partition = context.getPartition(); + return new DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(), scanState, + confFactory, context); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + + @Override + public Class<?> getRecordClass() throws AsterixException { + return Row.class; + } + + @Override + public List<String> getRecordReaderNames() { + return recordReaderNames; + } + + @Override + public Set<String> getReaderSupportedFormats() { + return Collections.singleton(ExternalDataConstants.FORMAT_DELTA); + } + + public static class PartitionWorkLoadBasedOnSize implements Serializable { + private static final long serialVersionUID = 1L; + private final List<String> scanFiles = new ArrayList<>(); + private long totalSize = 0; + + public PartitionWorkLoadBasedOnSize() { + } + + public List<String> getScanFiles() { + return scanFiles; + } + + public void addScanFile(String scanFile, long size) { + this.scanFiles.add(scanFile); + this.totalSize += size; + } + + public long getTotalSize() { + return totalSize; + } + + @Override + public String toString() { + return "Files: " + scanFiles.size() + ", Total Size: " + totalSize; + } + } + +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java new file mode 100644 index 0000000..a90f044 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.gcs.delta; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; +import org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory; +import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.google.gcs.GCSConstants; +import org.apache.asterix.external.util.google.gcs.GCSUtils; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.api.application.IServiceContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.hdfs.dataflow.ConfFactory; + +public class GCSDeltaReaderFactory extends DeltaReaderFactory { + private static final long serialVersionUID = 1L; + private static final List<String> recordReaderNames = + Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_GCS); + + @Override + public void configure(IServiceContext serviceCtx, Map<String, String> configuration, + IWarningCollector warningCollector, IExternalFilterEvaluatorFactory filterEvaluatorFactory) + throws AlgebricksException, HyracksDataException { + JobConf conf = new JobConf(); + ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); + AlgebricksAbsolutePartitionConstraint clusterLocations = + appCtx.getDataPartitioningProvider().getClusterLocations(); + int numPartitions = clusterLocations.getLocations().length; + GCSUtils.configureHdfsJobConf(conf, configuration, numPartitions); + ConfFactory config = new ConfFactory(conf); + setConfFactory(config); + tableMetadataPath = GCSConstants.HADOOP_GCS_PROTOCOL + "://" + + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' + + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); + super.configure(serviceCtx, configuration, warningCollector, filterEvaluatorFactory); + } + + @Override + public List<String> getRecordReaderNames() { + return recordReaderNames; + } + +} \ No newline at end of file diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index f06638d..92e954b 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -29,8 +29,10 @@ import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END; import static org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START; +import static org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf; import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties; import static org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties; +import static org.apache.asterix.external.util.google.gcs.GCSUtils.configureHdfsJobConf; import static org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE; import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE; @@ -72,7 +74,6 @@ import org.apache.asterix.external.api.IInputStreamFactory; import org.apache.asterix.external.api.IRecordReaderFactory; import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher; -import org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory; import org.apache.asterix.external.library.JavaLibrary; import org.apache.asterix.external.library.msgpack.MessagePackUtils; import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions; @@ -90,6 +91,7 @@ import org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo; import org.apache.asterix.runtime.projection.FunctionCallInformation; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -515,15 +517,25 @@ } } - public static void validateDeltaTableExists(Map<String, String> configuration) throws CompilationException { - Configuration conf = new Configuration(); + public static void validateDeltaTableExists(Map<String, String> configuration) throws AlgebricksException { String tableMetadataPath = null; + JobConf conf = new JobConf(); if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE) .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) { - AwsS3DeltaReaderFactory.configurationBuilder(configuration, conf); + configureAwsS3HdfsJobConf(conf, configuration, 500); tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://" + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); + } else if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE) + .equals(ExternalDataConstants.KEY_ADAPTER_NAME_GCS)) { + configureHdfsJobConf(conf, configuration, 500); + tableMetadataPath = GCSConstants.HADOOP_GCS_PROTOCOL + "://" + + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' + + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); + } else { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, + "Delta format is not supported for the external source type: " + + configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)); } Engine engine = DefaultEngine.create(conf); io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java index f36d25d..2590e73 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java @@ -470,7 +470,11 @@ throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container); } if (isDeltaTable(configuration)) { - validateDeltaTableExists(configuration); + try { + validateDeltaTableExists(configuration); + } catch (AlgebricksException e) { + throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e); + } } } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java index 74a664d..0fb4fec 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java @@ -22,6 +22,8 @@ import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE; import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT; import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix; +import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable; +import static org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties; import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude; import static org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME; import static org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME; @@ -140,9 +142,11 @@ */ public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc, IWarningCollector collector) throws CompilationException { - + if (isDeltaTable(configuration)) { + validateDeltaTableProperties(configuration); + } // check if the format property is present - if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { + else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) { throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT); } diff --git a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory index 2c15b5a..1f25c4b 100644 --- a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory +++ b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory @@ -28,4 +28,5 @@ org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory org.apache.asterix.external.input.record.reader.azure.parquet.AzureDataLakeParquetReaderFactory -org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory \ No newline at end of file +org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory +org.apache.asterix.external.input.record.reader.gcs.delta.GCSDeltaReaderFactory \ No newline at end of file -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19140 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I4cd44ba31a22cc124e346b077a1c2798ba9ab747 Gerrit-Change-Number: 19140 Gerrit-PatchSet: 1 Gerrit-Owner: Ayush Tripathi <ayush.tripa...@couchbase.com> Gerrit-MessageType: newchange