>From Ayush Tripathi <[email protected]>:

Ayush Tripathi has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20650?usp=email )


Change subject: pdf commit
......................................................................

pdf commit

Change-Id: I1ad074ab96da62407aba313f8bb3c3f5f8df47d3
---
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfInputStreamFactory.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfReaderFactory.java
A 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/PDFDataParserFactory.java
3 files changed, 386 insertions(+), 0 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/50/20650/1

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfInputStreamFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfInputStreamFactory.java
new file mode 100644
index 0000000..11e425e
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfInputStreamFactory.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.pdf;
+
+import java.net.URI;
+import java.net.http.HttpClient;
+import java.net.http.HttpRequest;
+import java.net.http.HttpResponse;
+// removed StandardCharsets import
+// removed unused ArrayList import
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.asterix.common.api.IApplicationContext;
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.AsterixInputStream;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import 
org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
+import org.apache.asterix.external.input.record.reader.aws.AwsS3InputStream;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataPrefix;
+import org.apache.asterix.external.util.ExternalDataUtils;
+// removed unused S3AuthUtils import
+import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+// removed unused direct S3 SDK imports
+import software.amazon.awssdk.services.s3.model.S3Object;
+
+// removed unused paginator import
+
+public class AwsS3PdfInputStreamFactory extends 
AbstractExternalInputStreamFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public AsterixInputStream createInputStream(IExternalDataRuntimeContext 
context) throws HyracksDataException {
+        IExternalFilterValueEmbedder valueEmbedder = 
context.getValueEmbedder();
+        int partition = context.getPartition();
+        IApplicationContext ncAppCtx = (IApplicationContext) 
context.getTaskContext().getJobletContext()
+                .getServiceContext().getApplicationContext();
+        return new AwsS3InputStream(ncAppCtx, configuration,
+                partitionWorkLoadsBasedOnSize.get(partition).getFilePaths(), 
valueEmbedder);
+    }
+
+    @Override
+    public void configure(IServiceContext ctx, Map<String, String> 
configuration, IWarningCollector warningCollector,
+            IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws 
AlgebricksException, HyracksDataException {
+        super.configure(ctx, configuration, warningCollector, 
filterEvaluatorFactory);
+
+        IApplicationContext appCtx = (IApplicationContext) 
ctx.getApplicationContext();
+
+        // Submit job and poll for completion, then list output objects from 
configured output-url
+        List<S3Object> filesOnly = java.util.Collections.emptyList();
+        try {
+            String jobsEndpoint = configuration.getOrDefault("jobs-endpoint", 
"http://localhost:8000/jobs";);
+            String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+            String definition = 
configuration.getOrDefault(ExternalDataConstants.DEFINITION_FIELD_NAME, "");
+            String def = definition == null ? "" : definition.trim();
+            if (!def.isEmpty() && def.startsWith("/")) {
+                def = def.substring(1);
+            }
+
+            String defaultInputUrl = "s3://" + (container == null ? "" : 
container) + (def.isEmpty() ? "" : "/" + def);
+            String inputUrl = configuration.getOrDefault("input-url", 
defaultInputUrl);
+            String datasetName = 
configuration.get(ExternalDataConstants.KEY_DATASET);
+            datasetName = datasetName == null ? "" : datasetName.trim();
+            if (!datasetName.isEmpty() && datasetName.startsWith("/")) {
+                datasetName = datasetName.substring(1);
+            }
+            if (!datasetName.isEmpty() && datasetName.endsWith("/")) {
+                datasetName = datasetName.substring(0, datasetName.length() - 
1);
+            }
+
+            // If a dataset name is provided, treat definition as 
definition/datasetName for listing
+            String effectiveDef = def;
+            if (!datasetName.isEmpty()) {
+                effectiveDef = def.isEmpty() ? datasetName : (def + "/" + 
datasetName);
+            }
+
+            String defaultOutputUrl = "s3://" + (container == null ? "" : 
container)
+                    + (effectiveDef.isEmpty() ? "" : ("/" + effectiveDef));
+            String outputUrl = configuration.getOrDefault("output-url", 
defaultOutputUrl);
+            String parsingModel = configuration.getOrDefault("parsing-model", 
"gpt-4.1-nano");
+            String pageRange = configuration.getOrDefault("page-range", "");
+            String fileGlob = configuration.getOrDefault("file-glob", "*.pdf");
+
+            String openaiApiKey = configuration.get("openai_api_key");
+            String awsAccessKeyId = configuration.get("aws_access_key_id");
+            String awsSecretAccessKey = 
configuration.get("aws_secret_access_key");
+            String parsingSchema = configuration.get("parsing-schema");
+            String parsingGuidance = configuration.get("parsing-guidance");
+
+            StringBuilder postBodyBuilder = new StringBuilder();
+            
postBodyBuilder.append("{\"input_url\":\"").append(escapeJson(inputUrl)).append("\"")
+                    
.append(",\"output_url\":\"").append(escapeJson(outputUrl)).append("\"")
+                    
.append(",\"parsing_model\":\"").append(escapeJson(parsingModel)).append("\"")
+                    
.append(",\"page_range\":\"").append(escapeJson(pageRange)).append("\"").append(",\"file_glob\":\"")
+                    .append(escapeJson(fileGlob)).append("\"");
+
+            if (openaiApiKey != null && !openaiApiKey.isEmpty()) {
+                
postBodyBuilder.append(",\"openai_api_key\":\"").append(escapeJson(openaiApiKey)).append("\"");
+            }
+            if (awsAccessKeyId != null && !awsAccessKeyId.isEmpty()) {
+                
postBodyBuilder.append(",\"aws_access_key_id\":\"").append(escapeJson(awsAccessKeyId)).append("\"");
+            }
+            if (awsSecretAccessKey != null && !awsSecretAccessKey.isEmpty()) {
+                
postBodyBuilder.append(",\"aws_secret_access_key\":\"").append(escapeJson(awsSecretAccessKey))
+                        .append("\"");
+            }
+            if (parsingSchema != null && !parsingSchema.isEmpty()) {
+                // config key uses hyphen, request expects snake_case: 
parsing_schema
+                
postBodyBuilder.append(",\"parsing_schema\":\"").append(escapeJson(parsingSchema)).append("\"");
+            }
+            if (parsingGuidance != null && !parsingGuidance.isEmpty()) {
+                // config key uses hyphen, request expects snake_case: 
parsing_guidance
+                
postBodyBuilder.append(",\"parsing_guidance\":\"").append(escapeJson(parsingGuidance)).append("\"");
+            }
+            postBodyBuilder.append('}');
+            String postBody = postBodyBuilder.toString();
+
+            HttpRequest submitReq =
+                    
HttpRequest.newBuilder(URI.create(jobsEndpoint)).header("Content-Type", 
"application/json")
+                            
.POST(HttpRequest.BodyPublishers.ofString(postBody)).build();
+            HttpResponse<String> submitResp =
+                    HttpClient.newHttpClient().send(submitReq, 
HttpResponse.BodyHandlers.ofString());
+            String submitBody = submitResp.body() == null ? "" : 
submitResp.body();
+            String jobId = extractJsonString(submitBody, "job_id");
+
+            if (jobId != null && !jobId.isEmpty()) {
+                String baseJobsEndpoint = jobsEndpoint;
+                int qm = baseJobsEndpoint.indexOf('?');
+                if (qm >= 0) {
+                    baseJobsEndpoint = baseJobsEndpoint.substring(0, qm);
+                }
+                String statusUrl =
+                        (baseJobsEndpoint.endsWith("/") ? (baseJobsEndpoint + 
jobId) : (baseJobsEndpoint + "/" + jobId))
+                                + "/status";
+                String status = null;
+                long deadline = System.currentTimeMillis() + 150_000; // 150s
+                int attempt = 0;
+                final long initialBackoffMs = 500L;
+                final long maxBackoffMs = 5000L;
+                while (System.currentTimeMillis() < deadline) {
+                    HttpRequest statusReq = 
HttpRequest.newBuilder(URI.create(statusUrl)).GET().build();
+                    HttpResponse<String> statusResp =
+                            HttpClient.newHttpClient().send(statusReq, 
HttpResponse.BodyHandlers.ofString());
+                    String statusBody = statusResp.body() == null ? "" : 
statusResp.body();
+                    status = extractJsonString(statusBody, "status");
+                    if ("completed".equalsIgnoreCase(status) || 
"success".equalsIgnoreCase(status)) {
+                        break;
+                    }
+                    if ("failed".equalsIgnoreCase(status) || 
"error".equalsIgnoreCase(status)
+                            || "cancelled".equalsIgnoreCase(status)) {
+                        status = "failed";
+                        break;
+                    }
+                    long backoff = initialBackoffMs << attempt;
+                    if (backoff > maxBackoffMs) {
+                        backoff = maxBackoffMs;
+                    }
+                    long jitter = backoff / 4; // 25% jitter
+                    long sleepMs = backoff + (jitter == 0 ? 0
+                            : 
java.util.concurrent.ThreadLocalRandom.current().nextLong(0, jitter + 1));
+                    try {
+                        Thread.sleep(sleepMs);
+                    } catch (InterruptedException ie) {
+                        Thread.currentThread().interrupt();
+                        break;
+                    }
+                    if (attempt < 30) { // prevent overflow on shift
+                        attempt++;
+                    }
+                }
+
+                // Replace with direct list using S3Utils like standard factory
+                configuration.put(ExternalDataConstants.DEFINITION_FIELD_NAME, 
effectiveDef);
+                IExternalFilterEvaluator evaluator = 
filterEvaluatorFactory.create(ctx, warningCollector);
+                ExternalDataPrefix externalDataPrefix = new 
ExternalDataPrefix(configuration);
+
+                configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, 
externalDataPrefix.getRoot());
+                IncludeExcludeMatcher includeExcludeMatcher =
+                        
ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+                filesOnly = S3Utils.listS3Objects(appCtx, configuration, 
includeExcludeMatcher, warningCollector,
+                        externalDataPrefix, evaluator);
+            } else {
+                // Fallback: direct list as well
+                configuration.put(ExternalDataConstants.DEFINITION_FIELD_NAME, 
effectiveDef);
+                IExternalFilterEvaluator evaluator = 
filterEvaluatorFactory.create(ctx, warningCollector);
+                ExternalDataPrefix externalDataPrefix = new 
ExternalDataPrefix(configuration);
+                configuration.put(ExternalDataPrefix.PREFIX_ROOT_FIELD_NAME, 
externalDataPrefix.getRoot());
+                IncludeExcludeMatcher includeExcludeMatcher =
+                        
ExternalDataUtils.getIncludeExcludeMatchers(configuration);
+                filesOnly = S3Utils.listS3Objects(appCtx, configuration, 
includeExcludeMatcher, warningCollector,
+                        externalDataPrefix, evaluator);
+            }
+        } catch (Throwable t) {
+            filesOnly = java.util.Collections.emptyList();
+        }
+
+        distributeWorkLoad(filesOnly, getPartitionsCount());
+    }
+
+    // Removed log parsing approach in favor of job submission and S3 listing
+
+    private void distributeWorkLoad(List<S3Object> fileObjects, int 
partitionsCount) {
+        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new 
PriorityQueue<>(partitionsCount,
+                
Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
+        for (int i = 0; i < partitionsCount; i++) {
+            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
+        }
+
+        for (S3Object object : fileObjects) {
+            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+            workload.addFilePath(object.key(), object.size());
+            workloadQueue.add(workload);
+        }
+        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
+    }
+
+    // Removed unused helper methods for orchestrator request body
+
+    // listOutputsFromS3 no longer needed after direct listing switch
+
+    private static String escapeJson(String s) {
+        if (s == null) {
+            return "";
+        }
+        return s.replace("\\", "\\\\").replace("\"", "\\\"");
+    }
+
+    private static String extractJsonString(String json, String field) {
+        if (json == null || field == null) {
+            return null;
+        }
+        String pattern = "\"" + field + "\"\\s*:\\s*\"";
+        java.util.regex.Pattern p = java.util.regex.Pattern.compile(pattern);
+        java.util.regex.Matcher m = p.matcher(json);
+        if (m.find()) {
+            int start = m.end();
+            int end = json.indexOf('"', start);
+            if (end > start) {
+                return json.substring(start, end);
+            }
+        }
+        return null;
+    }
+
+    // parse helpers removed
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfReaderFactory.java
new file mode 100644
index 0000000..36c92b0
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/pdf/AwsS3PdfReaderFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.pdf;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory;
+import org.apache.asterix.external.util.ExternalDataConstants;
+
+public class AwsS3PdfReaderFactory extends StreamRecordReaderFactory {
+
+    private static final long serialVersionUID = 1L;
+
+    private static final List<String> RECORD_READER_NAMES =
+            
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return RECORD_READER_NAMES;
+    }
+
+    @Override
+    protected void setStreamFactory(java.util.Map<String, String> config) {
+        streamFactory = new AwsS3PdfInputStreamFactory();
+    }
+
+    @Override
+    public Set<String> getReaderSupportedFormats() {
+        return Collections.singleton(ExternalDataConstants.FORMAT_PDF);
+    }
+}
\ No newline at end of file
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/PDFDataParserFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/PDFDataParserFactory.java
new file mode 100644
index 0000000..850927e
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/PDFDataParserFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.JSONDataParser;
+import org.apache.asterix.om.types.ARecordType;
+
+public class PDFDataParserFactory extends 
AbstractGenericDataParserFactory<char[]> {
+
+    private static final long serialVersionUID = 1L;
+    private static final List<String> PARSER_FORMAT = 
Collections.singletonList("pdf");
+
+    @Override
+    public IStreamDataParser 
createInputStreamParser(IExternalDataRuntimeContext context) {
+        return createParser(context);
+    }
+
+    @Override
+    public void setMetaType(ARecordType metaType) {
+        // no MetaType to set.
+    }
+
+    @Override
+    public List<String> getParserFormats() {
+        return PARSER_FORMAT;
+    }
+
+    @Override
+    public IRecordDataParser<char[]> 
createRecordParser(IExternalDataRuntimeContext context) {
+        return createParser(context);
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return char[].class;
+    }
+
+    private JSONDataParser createParser(IExternalDataRuntimeContext context) {
+        return new JSONDataParser(recordType, new 
com.fasterxml.jackson.core.JsonFactory(), context);
+    }
+}

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20650?usp=email
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings?usp=email

Gerrit-MessageType: newchange
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: I1ad074ab96da62407aba313f8bb3c3f5f8df47d3
Gerrit-Change-Number: 20650
Gerrit-PatchSet: 1
Gerrit-Owner: Ayush Tripathi <[email protected]>

Reply via email to