>From Ayush Tripathi <ayush.tripa...@couchbase.com>:

Ayush Tripathi has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19140 )


Change subject: [ASTERIXDB-3503][EXT] Introduce Delta Lake Support for Google 
Cloud Storage (GCS)
......................................................................

[ASTERIXDB-3503][EXT] Introduce Delta Lake Support for Google Cloud Storage 
(GCS)

- user model changes: no
- storage format changes: no
- interface changes: yes

Ext-ref: MB-64376

Change-Id: I4cd44ba31a22cc124e346b077a1c2798ba9ab747
---
M 
asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
M 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
8 files changed, 400 insertions(+), 237 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/40/19140/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
index 8dc820b..0902441 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java
@@ -18,208 +18,45 @@
  */
 package org.apache.asterix.external.input.record.reader.aws.delta;

-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
-import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+import static 
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;

-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Set;

-import org.apache.asterix.common.cluster.IClusterStateManager;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
 import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
-import org.apache.asterix.external.api.IExternalDataRuntimeContext;
-import org.apache.asterix.external.api.IRecordReader;
-import org.apache.asterix.external.api.IRecordReaderFactory;
-import 
org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
 import org.apache.asterix.external.util.ExternalDataConstants;
-import org.apache.asterix.external.util.HDFSUtils;
 import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.asterix.om.types.ARecordType;
-import org.apache.asterix.runtime.projection.FunctionCallInformation;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.IServiceContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.IWarningCollector;
-import org.apache.hyracks.api.exceptions.Warning;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;

-import io.delta.kernel.Scan;
-import io.delta.kernel.Snapshot;
-import io.delta.kernel.data.FilteredColumnarBatch;
-import io.delta.kernel.data.Row;
-import io.delta.kernel.defaults.engine.DefaultEngine;
-import io.delta.kernel.engine.Engine;
-import io.delta.kernel.exceptions.KernelException;
-import io.delta.kernel.internal.InternalScanFileUtils;
-import io.delta.kernel.types.StructType;
-import io.delta.kernel.utils.CloseableIterator;
-import io.delta.kernel.utils.FileStatus;
-
-public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> {
-
+public class AwsS3DeltaReaderFactory extends DeltaReaderFactory {
     private static final long serialVersionUID = 1L;
-    private static final List<String> recordReaderNames =
+    private static List<String> recordReaderNames =
             
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
-    private static final Logger LOGGER = LogManager.getLogger();
-    private transient AlgebricksAbsolutePartitionConstraint 
locationConstraints;
-    private String scanState;
-    private Map<String, String> configuration;
-    protected final List<PartitionWorkLoadBasedOnSize> 
partitionWorkLoadsBasedOnSize = new ArrayList<>();
-
-    @Override
-    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
-        return locationConstraints;
-    }

     @Override
     public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
             IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
             throws AlgebricksException, HyracksDataException {
-        this.configuration = configuration;
-        Configuration conf = new Configuration();
-        configurationBuilder(configuration, conf);
-        String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
+        JobConf conf = new JobConf();
+        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
+        AlgebricksAbsolutePartitionConstraint clusterLocations =
+                appCtx.getDataPartitioningProvider().getClusterLocations();
+        int numPartitions = clusterLocations.getLocations().length;
+        configureAwsS3HdfsJobConf(conf, configuration, numPartitions);
+        ConfFactory config = new ConfFactory(conf);
+        setConfFactory(config);
+        tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
                 + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
                 + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
-
-        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
-
-        Engine engine = DefaultEngine.create(conf);
-        io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
-        Snapshot snapshot;
-        try {
-            snapshot = table.getLatestSnapshot(engine);
-        } catch (KernelException e) {
-            LOGGER.info("Failed to get latest snapshot for table: {}", 
tableMetadataPath, e);
-            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
e, getMessageOrToString(e));
-        }
-
-        List<Warning> warnings = new ArrayList<>();
-        DeltaConverterContext converterContext = new 
DeltaConverterContext(configuration, warnings);
-        AsterixTypeToDeltaTypeVisitor visitor = new 
AsterixTypeToDeltaTypeVisitor(converterContext);
-        StructType requiredSchema;
-        try {
-            ARecordType expectedType = HDFSUtils.getExpectedType(conf);
-            Map<String, FunctionCallInformation> functionCallInformationMap =
-                    HDFSUtils.getFunctionCallInformationMap(conf);
-            StructType fileSchema = snapshot.getSchema(engine);
-            requiredSchema = visitor.clipType(expectedType, fileSchema, 
functionCallInformationMap);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        } catch (AsterixDeltaRuntimeException e) {
-            throw e.getHyracksDataException();
-        }
-        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
-        scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
-        CloseableIterator<FilteredColumnarBatch> iter = 
scan.getScanFiles(engine);
-
-        List<Row> scanFiles = new ArrayList<>();
-        while (iter.hasNext()) {
-            FilteredColumnarBatch batch = iter.next();
-            CloseableIterator<Row> rowIter = batch.getRows();
-            while (rowIter.hasNext()) {
-                Row row = rowIter.next();
-                scanFiles.add(row);
-            }
-        }
-        locationConstraints = configureLocationConstraints(appCtx, scanFiles);
-        configuration.put(ExternalDataConstants.KEY_PARSER, 
ExternalDataConstants.FORMAT_DELTA);
-        distributeFiles(scanFiles);
-        issueWarnings(warnings, warningCollector);
-    }
-
-    private void issueWarnings(List<Warning> warnings, IWarningCollector 
warningCollector) {
-        if (!warnings.isEmpty()) {
-            for (Warning warning : warnings) {
-                if (warningCollector.shouldWarn()) {
-                    warningCollector.warn(warning);
-                }
-            }
-        }
-        warnings.clear();
-    }
-
-    private AlgebricksAbsolutePartitionConstraint 
configureLocationConstraints(ICcApplicationContext appCtx,
-            List<Row> scanFiles) {
-        IClusterStateManager csm = appCtx.getClusterStateManager();
-
-        String[] locations = csm.getClusterLocations().getLocations();
-        if (scanFiles.size() == 0) {
-            return 
AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
-        } else if (locations.length > scanFiles.size()) {
-            LOGGER.debug(
-                    "analytics partitions ({}) exceeds total partition count 
({}); limiting ingestion partitions to total partition count",
-                    locations.length, scanFiles.size());
-            final String[] locationCopy = locations.clone();
-            ArrayUtils.shuffle(locationCopy);
-            locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
-        }
-        return new AlgebricksAbsolutePartitionConstraint(locations);
-    }
-
-    private void distributeFiles(List<Row> scanFiles) {
-        final int partitionsCount = 
getPartitionConstraint().getLocations().length;
-        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new 
PriorityQueue<>(partitionsCount,
-                
Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
-
-        // Prepare the workloads based on the number of partitions
-        for (int i = 0; i < partitionsCount; i++) {
-            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
-        }
-        for (Row scanFileRow : scanFiles) {
-            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
-            FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
-            workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), 
fileStatus.getSize());
-            workloadQueue.add(workload);
-        }
-        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
-    }
-
-    public static void configurationBuilder(Map<String, String> configuration, 
Configuration conf) {
-        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            conf.set(S3Constants.HADOOP_SESSION_TOKEN, 
configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        conf.set(S3Constants.HADOOP_REGION, 
configuration.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
-        conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
-                
configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
-        
conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
-                
configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
 ""));
-    }
-
-    @Override
-    public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext 
context) throws HyracksDataException {
-        try {
-            int partition = context.getPartition();
-            return new 
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(),
 scanState,
-                    configuration, context);
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
-        }
-    }
-
-    @Override
-    public Class<?> getRecordClass() throws AsterixException {
-        return Row.class;
+        super.configure(serviceCtx, configuration, warningCollector, 
filterEvaluatorFactory);
     }

     @Override
@@ -227,36 +64,4 @@
         return recordReaderNames;
     }

-    @Override
-    public Set<String> getReaderSupportedFormats() {
-        return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
-    }
-
-    public static class PartitionWorkLoadBasedOnSize implements Serializable {
-        private static final long serialVersionUID = 1L;
-        private final List<String> scanFiles = new ArrayList<>();
-        private long totalSize = 0;
-
-        public PartitionWorkLoadBasedOnSize() {
-        }
-
-        public List<String> getScanFiles() {
-            return scanFiles;
-        }
-
-        public void addScanFile(String scanFile, long size) {
-            this.scanFiles.add(scanFile);
-            this.totalSize += size;
-        }
-
-        public long getTotalSize() {
-            return totalSize;
-        }
-
-        @Override
-        public String toString() {
-            return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
-        }
-    }
-
-}
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
index a5b21b6..87e8d85 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaFileRecordReader.java
@@ -19,12 +19,10 @@
 package org.apache.asterix.external.input.record.reader.aws.delta;

 import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;
-import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;

 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;

 import org.apache.asterix.external.api.IExternalDataRuntimeContext;
@@ -33,10 +31,10 @@
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.input.record.GenericRecord;
 import org.apache.asterix.external.util.IFeedLogManager;
-import org.apache.asterix.external.util.aws.s3.S3Constants;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.data.std.primitive.VoidPointable;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;

 import io.delta.kernel.Scan;
 import io.delta.kernel.data.ColumnarBatch;
@@ -69,20 +67,10 @@
     private Row scanFile;
     private CloseableIterator<Row> rows;

-    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, Map<String, String> conf,
-            IExternalDataRuntimeContext context) {
-        Configuration config = new Configuration();
-        config.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
conf.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
-        config.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
conf.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
-        if (conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
-            config.set(S3Constants.HADOOP_SESSION_TOKEN, 
conf.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
-        }
-        config.set(S3Constants.HADOOP_REGION, 
conf.get(S3Constants.REGION_FIELD_NAME));
-        String serviceEndpoint = conf.get(SERVICE_END_POINT_FIELD_NAME);
-        if (serviceEndpoint != null) {
-            config.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
-        }
-        this.engine = DefaultEngine.create(config);
+    public DeltaFileRecordReader(List<String> serScanFiles, String 
serScanState, ConfFactory config,
+            IExternalDataRuntimeContext context) throws HyracksDataException {
+        JobConf conf = config.getConf();
+        this.engine = DefaultEngine.create(conf);
         this.scanFiles = new ArrayList<>();
         for (String scanFile : serScanFiles) {
             this.scanFiles.add(RowSerDe.deserializeRowFromJson(scanFile));
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
new file mode 100644
index 0000000..7ad1b84
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/DeltaReaderFactory.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta;
+
+import static 
org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.api.IRecordReaderFactory;
+import 
org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.HDFSUtils;
+import org.apache.asterix.external.util.aws.s3.S3Constants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.runtime.projection.FunctionCallInformation;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.engine.DefaultEngine;
+import io.delta.kernel.engine.Engine;
+import io.delta.kernel.exceptions.KernelException;
+import io.delta.kernel.internal.InternalScanFileUtils;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+
+public class DeltaReaderFactory implements IRecordReaderFactory<Object> {
+
+    private static final long serialVersionUID = 1L;
+    private static List<String> recordReaderNames;
+    private static final Logger LOGGER = LogManager.getLogger();
+    private transient AlgebricksAbsolutePartitionConstraint 
locationConstraints;
+    private String scanState;
+    private Map<String, String> configuration;
+    protected final List<PartitionWorkLoadBasedOnSize> 
partitionWorkLoadsBasedOnSize = new ArrayList<>();
+    protected ConfFactory confFactory;
+    protected String tableMetadataPath;
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return locationConstraints;
+    }
+
+    public void setRecordReaderNames(List<String> readerNames) {
+        recordReaderNames = readerNames;
+    }
+
+    public void setConfFactory(ConfFactory config) {
+        this.confFactory = config;
+    }
+
+    @Override
+    public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
+            IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+            throws AlgebricksException, HyracksDataException {
+        this.configuration = configuration;
+        Configuration conf = confFactory.getConf();
+        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
+
+        Engine engine = DefaultEngine.create(conf);
+        io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
+        Snapshot snapshot;
+        try {
+            snapshot = table.getLatestSnapshot(engine);
+        } catch (KernelException e) {
+            LOGGER.info("Failed to get latest snapshot for table: {}", 
tableMetadataPath, e);
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
e, getMessageOrToString(e));
+        }
+
+        List<Warning> warnings = new ArrayList<>();
+        DeltaConverterContext converterContext = new 
DeltaConverterContext(configuration, warnings);
+        AsterixTypeToDeltaTypeVisitor visitor = new 
AsterixTypeToDeltaTypeVisitor(converterContext);
+        StructType requiredSchema;
+        try {
+            ARecordType expectedType = HDFSUtils.getExpectedType(conf);
+            Map<String, FunctionCallInformation> functionCallInformationMap =
+                    HDFSUtils.getFunctionCallInformationMap(conf);
+            StructType fileSchema = snapshot.getSchema(engine);
+            requiredSchema = visitor.clipType(expectedType, fileSchema, 
functionCallInformationMap);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        } catch (AsterixDeltaRuntimeException e) {
+            throw e.getHyracksDataException();
+        }
+        Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, 
requiredSchema).build();
+        scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine));
+        CloseableIterator<FilteredColumnarBatch> iter = 
scan.getScanFiles(engine);
+
+        List<Row> scanFiles = new ArrayList<>();
+        while (iter.hasNext()) {
+            FilteredColumnarBatch batch = iter.next();
+            CloseableIterator<Row> rowIter = batch.getRows();
+            while (rowIter.hasNext()) {
+                Row row = rowIter.next();
+                scanFiles.add(row);
+            }
+        }
+        locationConstraints = configureLocationConstraints(appCtx, scanFiles);
+        configuration.put(ExternalDataConstants.KEY_PARSER, 
ExternalDataConstants.FORMAT_DELTA);
+        distributeFiles(scanFiles);
+        issueWarnings(warnings, warningCollector);
+    }
+
+    private void issueWarnings(List<Warning> warnings, IWarningCollector 
warningCollector) {
+        if (!warnings.isEmpty()) {
+            for (Warning warning : warnings) {
+                if (warningCollector.shouldWarn()) {
+                    warningCollector.warn(warning);
+                }
+            }
+        }
+        warnings.clear();
+    }
+
+    private AlgebricksAbsolutePartitionConstraint 
configureLocationConstraints(ICcApplicationContext appCtx,
+            List<Row> scanFiles) {
+        IClusterStateManager csm = appCtx.getClusterStateManager();
+
+        String[] locations = csm.getClusterLocations().getLocations();
+        if (scanFiles.size() == 0) {
+            return 
AlgebricksAbsolutePartitionConstraint.randomLocation(locations);
+        } else if (locations.length > scanFiles.size()) {
+            LOGGER.debug(
+                    "analytics partitions ({}) exceeds total partition count 
({}); limiting ingestion partitions to total partition count",
+                    locations.length, scanFiles.size());
+            final String[] locationCopy = locations.clone();
+            ArrayUtils.shuffle(locationCopy);
+            locations = ArrayUtils.subarray(locationCopy, 0, scanFiles.size());
+        }
+        return new AlgebricksAbsolutePartitionConstraint(locations);
+    }
+
+    private void distributeFiles(List<Row> scanFiles) {
+        final int partitionsCount = 
getPartitionConstraint().getLocations().length;
+        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new 
PriorityQueue<>(partitionsCount,
+                
Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
+        // Prepare the workloads based on the number of partitions
+        for (int i = 0; i < partitionsCount; i++) {
+            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
+        }
+        for (Row scanFileRow : scanFiles) {
+            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+            FileStatus fileStatus = 
InternalScanFileUtils.getAddFileStatus(scanFileRow);
+            workload.addScanFile(RowSerDe.serializeRowToJson(scanFileRow), 
fileStatus.getSize());
+            workloadQueue.add(workload);
+        }
+        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
+    }
+
+    public static void configurationBuilder(Map<String, String> configuration, 
Configuration conf) {
+        conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(S3Constants.ACCESS_KEY_ID_FIELD_NAME));
+        conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY, 
configuration.get(S3Constants.SECRET_ACCESS_KEY_FIELD_NAME));
+        if (configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME) != null) {
+            conf.set(S3Constants.HADOOP_SESSION_TOKEN, 
configuration.get(S3Constants.SESSION_TOKEN_FIELD_NAME));
+        }
+        conf.set(S3Constants.HADOOP_REGION, 
configuration.get(S3Constants.REGION_FIELD_NAME));
+        String serviceEndpoint = 
configuration.get(SERVICE_END_POINT_FIELD_NAME);
+        if (serviceEndpoint != null) {
+            conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint);
+        }
+        conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS,
+                
configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, ""));
+        
conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
+                
configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
 ""));
+    }
+
+    @Override
+    public IRecordReader<?> createRecordReader(IExternalDataRuntimeContext 
context) throws HyracksDataException {
+        try {
+            int partition = context.getPartition();
+            return new 
DeltaFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getScanFiles(),
 scanState,
+                    confFactory, context);
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public Class<?> getRecordClass() throws AsterixException {
+        return Row.class;
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
+    @Override
+    public Set<String> getReaderSupportedFormats() {
+        return Collections.singleton(ExternalDataConstants.FORMAT_DELTA);
+    }
+
+    public static class PartitionWorkLoadBasedOnSize implements Serializable {
+        private static final long serialVersionUID = 1L;
+        private final List<String> scanFiles = new ArrayList<>();
+        private long totalSize = 0;
+
+        public PartitionWorkLoadBasedOnSize() {
+        }
+
+        public List<String> getScanFiles() {
+            return scanFiles;
+        }
+
+        public void addScanFile(String scanFile, long size) {
+            this.scanFiles.add(scanFile);
+            this.totalSize += size;
+        }
+
+        public long getTotalSize() {
+            return totalSize;
+        }
+
+        @Override
+        public String toString() {
+            return "Files: " + scanFiles.size() + ", Total Size: " + totalSize;
+        }
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
new file mode 100644
index 0000000..a90f044
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/gcs/delta/GCSDeltaReaderFactory.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.gcs.delta;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import 
org.apache.asterix.external.input.record.reader.aws.delta.DeltaReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.google.gcs.GCSConstants;
+import org.apache.asterix.external.util.google.gcs.GCSUtils;
+import org.apache.hadoop.mapred.JobConf;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.hyracks.hdfs.dataflow.ConfFactory;
+
+public class GCSDeltaReaderFactory extends DeltaReaderFactory {
+    private static final long serialVersionUID = 1L;
+    private static final List<String> recordReaderNames =
+            
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_GCS);
+
+    @Override
+    public void configure(IServiceContext serviceCtx, Map<String, String> 
configuration,
+            IWarningCollector warningCollector, 
IExternalFilterEvaluatorFactory filterEvaluatorFactory)
+            throws AlgebricksException, HyracksDataException {
+        JobConf conf = new JobConf();
+        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
+        AlgebricksAbsolutePartitionConstraint clusterLocations =
+                appCtx.getDataPartitioningProvider().getClusterLocations();
+        int numPartitions = clusterLocations.getLocations().length;
+        GCSUtils.configureHdfsJobConf(conf, configuration, numPartitions);
+        ConfFactory config = new ConfFactory(conf);
+        setConfFactory(config);
+        tableMetadataPath = GCSConstants.HADOOP_GCS_PROTOCOL + "://"
+                + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+        super.configure(serviceCtx, configuration, warningCollector, 
filterEvaluatorFactory);
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return recordReaderNames;
+    }
+
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index f06638d..92e954b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -29,8 +29,10 @@
 import static org.apache.asterix.external.util.ExternalDataConstants.KEY_QUOTE;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_END;
 import static 
org.apache.asterix.external.util.ExternalDataConstants.KEY_RECORD_START;
+import static 
org.apache.asterix.external.util.aws.s3.S3AuthUtils.configureAwsS3HdfsJobConf;
 import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureBlobProperties;
 import static 
org.apache.asterix.external.util.azure.blob_storage.AzureUtils.validateAzureDataLakeProperties;
+import static 
org.apache.asterix.external.util.google.gcs.GCSUtils.configureHdfsJobConf;
 import static 
org.apache.asterix.external.util.google.gcs.GCSUtils.validateProperties;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
@@ -72,7 +74,6 @@
 import org.apache.asterix.external.api.IInputStreamFactory;
 import org.apache.asterix.external.api.IRecordReaderFactory;
 import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory.IncludeExcludeMatcher;
-import 
org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory;
 import org.apache.asterix.external.library.JavaLibrary;
 import org.apache.asterix.external.library.msgpack.MessagePackUtils;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
@@ -90,6 +91,7 @@
 import 
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -515,15 +517,25 @@
         }
     }

-    public static void validateDeltaTableExists(Map<String, String> 
configuration) throws CompilationException {
-        Configuration conf = new Configuration();
+    public static void validateDeltaTableExists(Map<String, String> 
configuration) throws AlgebricksException {
         String tableMetadataPath = null;
+        JobConf conf = new JobConf();
         if (configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
                 .equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3)) {
-            AwsS3DeltaReaderFactory.configurationBuilder(configuration, conf);
+            configureAwsS3HdfsJobConf(conf, configuration, 500);
             tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://"
                     + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
                     + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+        } else if 
(configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE)
+                .equals(ExternalDataConstants.KEY_ADAPTER_NAME_GCS)) {
+            configureHdfsJobConf(conf, configuration, 500);
+            tableMetadataPath = GCSConstants.HADOOP_GCS_PROTOCOL + "://"
+                    + 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/'
+                    + 
configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME);
+        } else {
+            throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
+                    "Delta format is not supported for the external source 
type: "
+                            + 
configuration.get(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE));
         }
         Engine engine = DefaultEngine.create(conf);
         io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, 
tableMetadataPath);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
index f36d25d..2590e73 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3AuthUtils.java
@@ -470,7 +470,11 @@
             throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
         }
         if (isDeltaTable(configuration)) {
-            validateDeltaTableExists(configuration);
+            try {
+                validateDeltaTableExists(configuration);
+            } catch (AlgebricksException e) {
+                throw new 
CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, e);
+            }
         }
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
index 74a664d..0fb4fec 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/gcs/GCSUtils.java
@@ -22,6 +22,8 @@
 import static 
org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
 import static 
org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
 import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
+import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
+import static 
org.apache.asterix.external.util.ExternalDataUtils.validateDeltaTableProperties;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.APPLICATION_DEFAULT_CREDENTIALS_FIELD_NAME;
 import static 
org.apache.asterix.external.util.google.gcs.GCSConstants.ENDPOINT_FIELD_NAME;
@@ -140,9 +142,11 @@
      */
     public static void validateProperties(Map<String, String> configuration, 
SourceLocation srcLoc,
             IWarningCollector collector) throws CompilationException {
-
+        if (isDeltaTable(configuration)) {
+            validateDeltaTableProperties(configuration);
+        }
         // check if the format property is present
-        if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+        else if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
         }

diff --git 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 2c15b5a..1f25c4b 100644
--- 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -28,4 +28,5 @@
 
org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
 
org.apache.asterix.external.input.record.reader.azure.parquet.AzureBlobParquetReaderFactory
 
org.apache.asterix.external.input.record.reader.azure.parquet.AzureDataLakeParquetReaderFactory
-org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory
\ No newline at end of file
+org.apache.asterix.external.input.record.reader.gcs.parquet.GCSParquetReaderFactory
+org.apache.asterix.external.input.record.reader.gcs.delta.GCSDeltaReaderFactory
\ No newline at end of file

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19140
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I4cd44ba31a22cc124e346b077a1c2798ba9ab747
Gerrit-Change-Number: 19140
Gerrit-PatchSet: 1
Gerrit-Owner: Ayush Tripathi <ayush.tripa...@couchbase.com>
Gerrit-MessageType: newchange

Reply via email to