lidavidm commented on code in PR #34227:
URL: https://github.com/apache/arrow/pull/34227#discussion_r1192693120


##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAceroSubstraitConsumer extends TestDataset {
+
+  @ClassRule
+  public static final TemporaryFolder TMP = new TemporaryFolder();
+  public static final String AVRO_SCHEMA_USER = "user.avsc";
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected BufferAllocator rootAllocator() {
+    return allocator;
+  }
+
+  @Test
+  public void testRunQueryLocalFiles() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name 
VARCHAR(150));" "SELECT id, name FROM Users"
+    //VARCHAR(150) -> is mapping to -> {ARROW:extension:name=varchar, 
ARROW:extension:metadata=varchar{length:150}}
+    Map<String, String> metadataName = new HashMap<>();
+    metadataName.put("ARROW:extension:name", "varchar");
+    metadataName.put("ARROW:extension:metadata", "varchar{length:150}");

Review Comment:
   Is the metadata actually necessary?



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAceroSubstraitConsumer extends TestDataset {
+
+  @ClassRule
+  public static final TemporaryFolder TMP = new TemporaryFolder();
+  public static final String AVRO_SCHEMA_USER = "user.avsc";
+  private RootAllocator allocator = null;

Review Comment:
   I think I've stated this before, but do not use RootAllocator as a type, use 
BufferAllocator



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAceroSubstraitConsumer extends TestDataset {
+
+  @ClassRule
+  public static final TemporaryFolder TMP = new TemporaryFolder();
+  public static final String AVRO_SCHEMA_USER = "user.avsc";
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected BufferAllocator rootAllocator() {
+    return allocator;
+  }

Review Comment:
   nit: why the extra getter? there's no need.



##########
java/dataset/src/test/java/org/apache/arrow/dataset/substrait/TestAceroSubstraitConsumer.java:
##########
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.arrow.dataset.ParquetWriteSupport;
+import org.apache.arrow.dataset.TestDataset;
+import org.apache.arrow.dataset.file.FileFormat;
+import org.apache.arrow.dataset.file.FileSystemDatasetFactory;
+import org.apache.arrow.dataset.jni.NativeMemoryPool;
+import org.apache.arrow.dataset.scanner.ScanOptions;
+import org.apache.arrow.dataset.scanner.Scanner;
+import org.apache.arrow.dataset.source.Dataset;
+import org.apache.arrow.dataset.source.DatasetFactory;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.ipc.ArrowReader;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestAceroSubstraitConsumer extends TestDataset {
+
+  @ClassRule
+  public static final TemporaryFolder TMP = new TemporaryFolder();
+  public static final String AVRO_SCHEMA_USER = "user.avsc";
+  private RootAllocator allocator = null;
+
+  @Before
+  public void setUp() {
+    allocator = new RootAllocator(Long.MAX_VALUE);
+  }
+
+  @After
+  public void tearDown() {
+    allocator.close();
+  }
+
+  protected BufferAllocator rootAllocator() {
+    return allocator;
+  }
+
+  @Test
+  public void testRunQueryLocalFiles() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name 
VARCHAR(150));" "SELECT id, name FROM Users"
+    //VARCHAR(150) -> is mapping to -> {ARROW:extension:name=varchar, 
ARROW:extension:metadata=varchar{length:150}}
+    Map<String, String> metadataName = new HashMap<>();
+    metadataName.put("ARROW:extension:name", "varchar");
+    metadataName.put("ARROW:extension:metadata", "varchar{length:150}");
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("ID", new ArrowType.Int(32, true)),
+        new Field("NAME", new FieldType(true, new ArrowType.Utf8(), null, 
metadataName), null)
+    ), Collections.emptyMap());
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, 
"c");
+    try (ArrowReader arrowReader = new AceroSubstraitConsumer(rootAllocator())
+        .runQuery(
+            planReplaceLocalFileURI(
+                new 
String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+                    
.getResource("substrait/local_files_users.json").toURI()))),
+                writeSupport.getOutputURI()
+            )
+        )
+    ) {
+      assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+      int rowcount = 0;
+      while (arrowReader.loadNextBatch()) {
+        rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+      }
+      assertEquals(3, rowcount);
+    }
+  }
+
+  @Test
+  public void testRunQueryNamedTableNation() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name 
VARCHAR(150));" "SELECT id, name FROM Users"
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("ID", new ArrowType.Int(32, true)),
+        Field.nullable("NAME", new ArrowType.Utf8())
+    ), Collections.emptyMap());
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, 
"c");
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("USERS", reader);
+      try (ArrowReader arrowReader = new 
AceroSubstraitConsumer(rootAllocator()).runQuery(
+          new 
String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+              .getResource("substrait/named_table_users.json").toURI()))),
+          mapTableToArrowReader
+      )) {
+        assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+        assertEquals(arrowReader.getVectorSchemaRoot().getSchema(), schema);
+        int rowcount = 0;
+        while (arrowReader.loadNextBatch()) {
+          rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+        }
+        assertEquals(3, rowcount);
+      }
+    }
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testRunQueryNamedTableNationWithException() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name 
VARCHAR(150));" "SELECT id, name FROM Users"
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("ID", new ArrowType.Int(32, true)),
+        Field.nullable("NAME", new ArrowType.Utf8())
+    ), Collections.emptyMap());
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, 
"c");
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("USERS_INVALID_MAP", reader);
+      try (ArrowReader arrowReader = new 
AceroSubstraitConsumer(rootAllocator()).runQuery(
+          new 
String(Files.readAllBytes(Paths.get(TestAceroSubstraitConsumer.class.getClassLoader()
+              .getResource("substrait/named_table_users.json").toURI()))),
+          mapTableToArrowReader
+      )) {
+        assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+        int rowcount = 0;
+        while (arrowReader.loadNextBatch()) {
+          rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+        }
+        assertEquals(3, rowcount);
+      }
+    }
+  }
+
+  @Test
+  public void testRunBinaryQueryNamedTableNation() throws Exception {
+    //Query:
+    //SELECT id, name FROM Users
+    //Isthmus:
+    //./isthmus-macOS-0.7.0  -c "CREATE TABLE USERS ( id INT NOT NULL, name 
VARCHAR(150));" "SELECT id, name FROM Users"
+    final Schema schema = new Schema(Arrays.asList(
+        Field.nullable("ID", new ArrowType.Int(32, true)),
+        Field.nullable("NAME", new ArrowType.Utf8())
+    ), Collections.emptyMap());
+    // Base64.getEncoder().encodeToString(plan.toByteArray());
+    String binaryPlan =
+        "Gl8SXQpROk8KBhIECgICAxIvCi0KAgoAEh4KAklECgROQU1FEhIKBCoCEAEKC" +
+            
"LIBBQiWARgBGAI6BwoFVVNFUlMaCBIGCgISACIAGgoSCAoEEgIIASIAEgJJRBIETkFNRQ==";
+    ParquetWriteSupport writeSupport = ParquetWriteSupport
+        .writeTempFile(AVRO_SCHEMA_USER, TMP.newFolder(), 1, "a", 11, "b", 21, 
"c");
+    ScanOptions options = new ScanOptions(/*batchSize*/ 32768);
+    try (
+        DatasetFactory datasetFactory = new 
FileSystemDatasetFactory(rootAllocator(), NativeMemoryPool.getDefault(),
+            FileFormat.PARQUET, writeSupport.getOutputURI());
+        Dataset dataset = datasetFactory.finish();
+        Scanner scanner = dataset.newScan(options);
+        ArrowReader reader = scanner.scanBatches()
+    ) {
+      // map table to reader
+      Map<String, ArrowReader> mapTableToArrowReader = new HashMap<>();
+      mapTableToArrowReader.put("USERS", reader);
+      // get binary plan
+      byte[] plan = Base64.getDecoder().decode(binaryPlan);
+      ByteBuffer substraitPlan = ByteBuffer.allocateDirect(plan.length);
+      substraitPlan.put(plan);
+      // run query
+      try (ArrowReader arrowReader = new 
AceroSubstraitConsumer(rootAllocator()).runQuery(
+          substraitPlan,
+          mapTableToArrowReader
+      )) {
+        assertEquals(schema, arrowReader.getVectorSchemaRoot().getSchema());
+        int rowcount = 0;
+        while (arrowReader.loadNextBatch()) {
+          rowcount += arrowReader.getVectorSchemaRoot().getRowCount();
+        }
+        assertEquals(3, rowcount);
+      }
+    }
+  }
+
+  private static String planReplaceLocalFileURI(String plan, String uri) {
+    StringBuilder builder = new StringBuilder(plan);
+    builder.replace(builder.indexOf("FILENAME_PLACEHOLDER"),
+        builder.indexOf("FILENAME_PLACEHOLDER") + 
"FILENAME_PLACEHOLDER".length(), uri);
+    return builder.toString();
+  }

Review Comment:
   why aren't we just using String.replace?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/JniWrapper.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+
+import org.apache.arrow.dataset.jni.JniLoader;
+
+/**
+ * Class that contains Native methods to call Acero C++ Substrait API. It 
internally depends on C++ function
+ * arrow::engine::ExecuteSerializedPlan. Currently supported input parameters 
supported are:
+ * <pre>
+ * - arrow::Buffer: Susbtrait Plan (JSON or Binary format).
+ * - arrow::engine::ConversionOptions: Mapping for 
arrow::engine::NamedTableProvider.
+ * </pre>
+ */
+final class JniWrapper {
+  private static final JniWrapper INSTANCE = new JniWrapper();
+
+  private JniWrapper() {
+  }
+
+  public static JniWrapper get() {
+    JniLoader.get().ensureLoaded();
+    return INSTANCE;
+  }
+
+  /**
+   * Consume the JSON Substrait Plan that contains Local Files and export the 
RecordBatchReader into
+   * C-Data Interface ArrowArrayStream.
+   *
+   * @param planInput the JSON Substrait plan.
+   * @param memoryAddressOutput the memory address where RecordBatchReader is 
exported.
+   */
+  public native void executeSerializedPlanLocalFiles(String planInput, long 
memoryAddressOutput);

Review Comment:
   This was never addressed?



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -261,6 +264,54 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
   default_memory_pool_id = -1L;
 }
 
+/// Unpack the named tables passed through JNI.
+///
+/// Named tables are encoded as a string array, where every two elements
+/// encode (1) the table name and (2) the address of an ArrowArrayStream
+/// containing the table data.  This function will eagerly read all
+/// tables into Tables.
+std::unordered_map<std::string, std::shared_ptr<arrow::Table>> 
LoadNamedTables(JNIEnv* env, jobjectArray& str_array) {

Review Comment:
   can this be const reference?



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations 
supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class AceroSubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public AceroSubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Run Substrait plan.
+   *
+   * @param plan The JSON Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan) throws Exception {
+    return runQuery(plan, Collections.emptyMap());
+  }
+
+  /**
+   * Run Substrait plan.
+   *
+   * @param plan The JSON Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an 
ArrowReader providing the data
+   *                    for the table. Contains the Table Name to Query as a 
Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan, Map<String, ArrowReader> 
namedTables) throws Exception {
+    return execute(plan, namedTables);
+  }
+
+  /**
+   * Run Substrait plan.
+   *
+   * @param plan                  the binary Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(ByteBuffer plan) throws Exception {
+    return runQuery(plan, Collections.emptyMap());
+  }
+
+  /**
+   * Read binary Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   *
+   * @param plan                  the binary Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an 
ArrowReader providing the data
+   *                              for the table. Contains the Table Name to 
Query as a Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(ByteBuffer plan, Map<String, ArrowReader> 
namedTables) throws Exception {
+    return execute(plan, namedTables);
+  }
+
+  private ArrowReader execute(String plan, Map<String, ArrowReader> 
namedTables) throws Exception {
+    List<ArrowArrayStream> arrowArrayStream = new ArrayList<>();
+    try (
+        ArrowArrayStream streamOutput = 
ArrowArrayStream.allocateNew(this.allocator)
+    ) {
+      String[] mapTableToMemoryAddress = 
getMapTableToMemoryAddress(namedTables, arrowArrayStream);
+      JniWrapper.get().executeSerializedPlanNamedTables(
+          plan,
+          mapTableToMemoryAddress,
+          streamOutput.memoryAddress()
+      );
+      return Data.importArrayStream(this.allocator, streamOutput);
+    } finally {
+      for (ArrowArrayStream stream : arrowArrayStream) {
+        stream.close();
+      }
+    }
+  }
+
+  private ArrowReader execute(ByteBuffer plan, Map<String, ArrowReader> 
namedTables) throws Exception {
+    List<ArrowArrayStream> arrowArrayStream = new ArrayList<>();
+    try (
+        ArrowArrayStream streamOutput = 
ArrowArrayStream.allocateNew(this.allocator)
+    ) {
+      String[] mapTableToMemoryAddress = 
getMapTableToMemoryAddress(namedTables, arrowArrayStream);
+      JniWrapper.get().executeSerializedPlanNamedTables(
+          plan,
+          mapTableToMemoryAddress,
+          streamOutput.memoryAddress()
+      );
+      return Data.importArrayStream(this.allocator, streamOutput);
+    } finally {
+      try {
+        AutoCloseables.close(arrowArrayStream);
+      } catch (RuntimeException e) {
+        throw e;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }

Review Comment:
   Why the extra catches?



##########
java/dataset/src/main/cpp/jni_wrapper.cc:
##########
@@ -261,6 +264,54 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
   default_memory_pool_id = -1L;
 }
 
+/// Unpack the named tables passed through JNI.
+///
+/// Named tables are encoded as a string array, where every two elements
+/// encode (1) the table name and (2) the address of an ArrowArrayStream
+/// containing the table data.  This function will eagerly read all
+/// tables into Tables.
+std::unordered_map<std::string, std::shared_ptr<arrow::Table>> 
LoadNamedTables(JNIEnv* env, jobjectArray& str_array) {
+  std::unordered_map<std::string, std::shared_ptr<arrow::Table>> 
map_table_to_record_batch_reader;
+  int length = env->GetArrayLength(str_array);
+  if (length % 2 != 0) {
+    JniThrow("Can not map odd number of array elements to key/value pairs");
+  }
+  std::shared_ptr<arrow::Table> output_table;
+  for (int pos = 0; pos < length; pos++) {
+    auto j_string_key = 
reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+    pos++;
+    auto j_string_value = 
reinterpret_cast<jstring>(env->GetObjectArrayElement(str_array, pos));
+    uintptr_t memory_address = 0;
+    try {
+      memory_address = std::stol(JStringToCString(env, j_string_value));
+    } catch(const std::runtime_error& re) {
+      JniThrow("Failed to parse memory address from string value. Runtime 
error: " + std::string(re.what()));
+    } catch(const std::exception& ex) {
+      JniThrow("Failed to parse memory address from string value. Error: " + 
std::string(ex.what()));

Review Comment:
   runtime_error subclasses exception



##########
java/dataset/src/main/java/org/apache/arrow/dataset/substrait/AceroSubstraitConsumer.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.dataset.substrait;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.arrow.c.ArrowArrayStream;
+import org.apache.arrow.c.Data;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.ipc.ArrowReader;
+
+/**
+ * Class to expose Java Substrait API for end users, currently operations 
supported are only to Consume Substrait Plan
+ * in Plan format (JSON) or Binary format (ByteBuffer).
+ */
+public final class AceroSubstraitConsumer {
+  private final BufferAllocator allocator;
+
+  public AceroSubstraitConsumer(BufferAllocator allocator) {
+    this.allocator = allocator;
+  }
+
+  /**
+   * Run Substrait plan.
+   *
+   * @param plan The JSON Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan) throws Exception {
+    return runQuery(plan, Collections.emptyMap());
+  }
+
+  /**
+   * Run Substrait plan.
+   *
+   * @param plan The JSON Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an 
ArrowReader providing the data
+   *                    for the table. Contains the Table Name to Query as a 
Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(String plan, Map<String, ArrowReader> 
namedTables) throws Exception {
+    return execute(plan, namedTables);
+  }
+
+  /**
+   * Run Substrait plan.
+   *
+   * @param plan                  the binary Substrait plan.
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(ByteBuffer plan) throws Exception {
+    return runQuery(plan, Collections.emptyMap());
+  }
+
+  /**
+   * Read binary Substrait plan, execute and return an ArrowReader to read 
Schema and ArrowRecordBatches.
+   *
+   * @param plan                  the binary Substrait plan.
+   * @param namedTables A mapping of named tables referenced by the plan to an 
ArrowReader providing the data
+   *                              for the table. Contains the Table Name to 
Query as a Key and ArrowReader as a Value.
+   * <pre>{@code ArrowReader nationReader = scanner.scanBatches();
+   * Map<String, ArrowReader> namedTables = new HashMap<>();
+   * namedTables.put("NATION", nationReader);}</pre>
+   * @return the ArrowReader to iterate for record batches.
+   */
+  public ArrowReader runQuery(ByteBuffer plan, Map<String, ArrowReader> 
namedTables) throws Exception {
+    return execute(plan, namedTables);
+  }
+
+  private ArrowReader execute(String plan, Map<String, ArrowReader> 
namedTables) throws Exception {
+    List<ArrowArrayStream> arrowArrayStream = new ArrayList<>();
+    try (
+        ArrowArrayStream streamOutput = 
ArrowArrayStream.allocateNew(this.allocator)
+    ) {
+      String[] mapTableToMemoryAddress = 
getMapTableToMemoryAddress(namedTables, arrowArrayStream);
+      JniWrapper.get().executeSerializedPlanNamedTables(
+          plan,
+          mapTableToMemoryAddress,
+          streamOutput.memoryAddress()
+      );
+      return Data.importArrayStream(this.allocator, streamOutput);
+    } finally {
+      for (ArrowArrayStream stream : arrowArrayStream) {
+        stream.close();
+      }

Review Comment:
   Use AutoCloseables



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to