lidavidm commented on code in PR #34227: URL: https://github.com/apache/arrow/pull/34227#discussion_r1145308459
########## java/dataset/src/main/java/org/apache/arrow/dataset/substrait/SubstraitAceroConsumer.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.substrait; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan + * in Plan format (JSON) or Binary format (ByteBuffer). + */ +public final class SubstraitAceroConsumer { Review Comment: should be AceroSubstraitConsumer ########## java/dataset/src/main/cpp/jni_wrapper.cc: ########## @@ -578,3 +601,102 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner)); JNI_METHOD_END() } + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanLocalFiles + * Signature: (Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_lang_String_2J ( + JNIEnv* env, jobject, jstring plan, jlong c_arrow_array_stream_address_out) { + JNI_METHOD_START + auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out); + std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan))); + std::shared_ptr<arrow::RecordBatchReader> reader = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer)); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream)); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanLocalFiles + * Signature: (Ljava/nio/ByteBuffer;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanLocalFiles__Ljava_nio_ByteBuffer_2J ( + JNIEnv* env, jobject, jobject plan, jlong c_arrow_array_stream_address_out) { + JNI_METHOD_START + auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address_out); + // mapping arrow::Buffer + auto *buff = reinterpret_cast<jbyte*>(env->GetDirectBufferAddress(plan)); + int length = env->GetDirectBufferCapacity(plan); + std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::AllocateBuffer(length)); + std::memcpy(buffer->mutable_data(), buff, length); + // execute plan + std::shared_ptr<arrow::RecordBatchReader> reader = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer)); + JniAssertOkOrThrow(arrow::ExportRecordBatchReader(reader, arrow_stream)); + JNI_METHOD_END() +} + +/* + * Class: org_apache_arrow_dataset_substrait_JniWrapper + * Method: executeSerializedPlanNamedTables + * Signature: (Ljava/lang/String;[Ljava/lang/String;J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_substrait_JniWrapper_executeSerializedPlanNamedTables__Ljava_lang_String_2_3Ljava_lang_String_2J ( + JNIEnv* env, jobject, jstring plan, jobjectArray table_to_memory_address_input, jlong memory_address_output) { + JNI_METHOD_START + // get mapping of table name to memory address + std::unordered_map<std::string, std::shared_ptr<arrow::Table>> map_table_to_reader = ToMapTableToArrowReader(env, table_to_memory_address_input); + // create table provider + arrow::engine::NamedTableProvider table_provider = [&map_table_to_reader](const std::vector<std::string>& names, const arrow::Schema&) { + std::shared_ptr<arrow::Table> output_table; + for (const auto& name : names) { + output_table = map_table_to_reader[name]; + } + std::shared_ptr<arrow::compute::ExecNodeOptions> options = + std::make_shared<arrow::compute::TableSourceNodeOptions>(std::move(output_table)); + return arrow::compute::Declaration("table_source", {}, options, "java_source"); + }; + arrow::engine::ConversionOptions conversion_options; + conversion_options.named_table_provider = std::move(table_provider); + // execute plan + std::shared_ptr<arrow::Buffer> buffer = JniGetOrThrow(arrow::engine::SerializeJsonPlan(JStringToCString(env, plan))); + std::shared_ptr<arrow::RecordBatchReader> readerOut = JniGetOrThrow(arrow::engine::ExecuteSerializedPlan(*buffer, NULLPTR, NULLPTR, conversion_options)); Review Comment: NULLPTR is only for headers, use nullptr ########## java/dataset/src/main/java/org/apache/arrow/dataset/substrait/SubstraitAceroConsumer.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.substrait; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan + * in Plan format (JSON) or Binary format (ByteBuffer). + */ +public final class SubstraitAceroConsumer { + private final BufferAllocator allocator; + + public SubstraitAceroConsumer(BufferAllocator allocator) { + this.allocator = allocator; + } + + /** + * Read plain-text Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches. + * Needed to define a mapping name of Tables and theirs ArrowReader representation. + * + * @param plan The JSON Substrait plan. + * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data + * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value. + * <pre>{@code ArrowReader nationReader = scanner.scanBatches(); + * Map<String, ArrowReader> namedTables = new HashMap<>(); + * namedTables.put("NATION", nationReader);}</pre> + * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(String plan, Map<String, ArrowReader> namedTables) { + if (namedTables.isEmpty()) { + return getArrowReader(plan); + } else { + return getArrowReader(plan, namedTables); + } + } + + /** + * Read binary Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches. + * Needed to define a mapping name of Tables and theirs ArrowReader representation. + * + * @param plan the binary Substrait plan. + * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data + * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value. + * <pre>{@code ArrowReader nationReader = scanner.scanBatches(); + * Map<String, ArrowReader> namedTables = new HashMap<>(); + * namedTables.put("NATION", nationReader);}</pre> + * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(ByteBuffer plan, Map<String, ArrowReader> namedTables) { + if (namedTables.isEmpty()) { + return getArrowReader(plan); + } else { + return getArrowReader(plan, namedTables); + } + } + + private ArrowReader getArrowReader(Object plan) { Review Comment: I don't see the point of this overload; what is wrong with executing a plan with an empty set of named tables? ########## cpp/thirdparty/versions.txt: ########## @@ -54,7 +54,7 @@ ARROW_AWS_LC_BUILD_SHA256_CHECKSUM=ae96a3567161552744fc0cae8b4d68ed88b1ec0f3d3c9 ARROW_AWSSDK_BUILD_VERSION=1.10.55 ARROW_AWSSDK_BUILD_SHA256_CHECKSUM=2d552fb1a84bef4a9b65e34aa7031851ed2aef5319e02cc6e4cb735c48aa30de ARROW_BOOST_BUILD_VERSION=1.81.0 -ARROW_BOOST_BUILD_SHA256_CHECKSUM=9e0ffae35528c35f90468997bc8d99500bf179cbae355415a89a600c38e13574 +ARROW_BOOST_BUILD_SHA256_CHECKSUM=205666dea9f6a7cfed87c7a6dfbeb52a2c1b9de55712c9c1a87735d7181452b6 Review Comment: Why are we changing this? ########## java/dataset/src/main/java/org/apache/arrow/dataset/substrait/SubstraitAceroConsumer.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.substrait; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan + * in Plan format (JSON) or Binary format (ByteBuffer). + */ +public final class SubstraitAceroConsumer { + private final BufferAllocator allocator; + + public SubstraitAceroConsumer(BufferAllocator allocator) { + this.allocator = allocator; + } + + /** + * Read plain-text Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches. + * Needed to define a mapping name of Tables and theirs ArrowReader representation. + * + * @param plan The JSON Substrait plan. + * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data + * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value. + * <pre>{@code ArrowReader nationReader = scanner.scanBatches(); + * Map<String, ArrowReader> namedTables = new HashMap<>(); + * namedTables.put("NATION", nationReader);}</pre> + * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(String plan, Map<String, ArrowReader> namedTables) { + if (namedTables.isEmpty()) { + return getArrowReader(plan); + } else { + return getArrowReader(plan, namedTables); + } + } + + /** + * Read binary Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches. + * Needed to define a mapping name of Tables and theirs ArrowReader representation. + * + * @param plan the binary Substrait plan. + * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data + * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value. + * <pre>{@code ArrowReader nationReader = scanner.scanBatches(); + * Map<String, ArrowReader> namedTables = new HashMap<>(); + * namedTables.put("NATION", nationReader);}</pre> + * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(ByteBuffer plan, Map<String, ArrowReader> namedTables) { + if (namedTables.isEmpty()) { + return getArrowReader(plan); + } else { + return getArrowReader(plan, namedTables); + } + } + + private ArrowReader getArrowReader(Object plan) { + try (ArrowArrayStream arrowArrayStream = ArrowArrayStream.allocateNew(this.allocator)) { + if (plan instanceof String) { + JniWrapper.get().executeSerializedPlanLocalFiles((String) plan, arrowArrayStream.memoryAddress()); + } else if (plan instanceof ByteBuffer) { + System.out.println("plan--=<"); + System.out.println(plan); Review Comment: Stray prints ########## java/dataset/src/main/java/org/apache/arrow/dataset/substrait/SubstraitAceroConsumer.java: ########## @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.substrait; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.c.ArrowArrayStream; +import org.apache.arrow.c.Data; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; + +/** + * Class to expose Java Substrait API for end users, currently operations supported are only to Consume Substrait Plan + * in Plan format (JSON) or Binary format (ByteBuffer). + */ +public final class SubstraitAceroConsumer { + private final BufferAllocator allocator; + + public SubstraitAceroConsumer(BufferAllocator allocator) { + this.allocator = allocator; + } + + /** + * Read plain-text Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches. + * Needed to define a mapping name of Tables and theirs ArrowReader representation. + * + * @param plan The JSON Substrait plan. + * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data + * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value. + * <pre>{@code ArrowReader nationReader = scanner.scanBatches(); + * Map<String, ArrowReader> namedTables = new HashMap<>(); + * namedTables.put("NATION", nationReader);}</pre> + * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(String plan, Map<String, ArrowReader> namedTables) { + if (namedTables.isEmpty()) { + return getArrowReader(plan); + } else { + return getArrowReader(plan, namedTables); + } + } + + /** + * Read binary Substrait plan, execute and return an ArrowReader to read Schema and ArrowRecordBatches. + * Needed to define a mapping name of Tables and theirs ArrowReader representation. + * + * @param plan the binary Substrait plan. + * @param namedTables A mapping of named tables referenced by the plan to an ArrowReader providing the data + * for the table. Contains the Table Name to Query as a Key and ArrowReader as a Value. + * <pre>{@code ArrowReader nationReader = scanner.scanBatches(); + * Map<String, ArrowReader> namedTables = new HashMap<>(); + * namedTables.put("NATION", nationReader);}</pre> + * @return the ArrowReader to iterate for record batches. + */ + public ArrowReader runQuery(ByteBuffer plan, Map<String, ArrowReader> namedTables) { + if (namedTables.isEmpty()) { + return getArrowReader(plan); + } else { + return getArrowReader(plan, namedTables); + } + } + + private ArrowReader getArrowReader(Object plan) { + try (ArrowArrayStream arrowArrayStream = ArrowArrayStream.allocateNew(this.allocator)) { + if (plan instanceof String) { + JniWrapper.get().executeSerializedPlanLocalFiles((String) plan, arrowArrayStream.memoryAddress()); + } else if (plan instanceof ByteBuffer) { + System.out.println("plan--=<"); + System.out.println(plan); + ByteBuffer demo = (ByteBuffer) plan; + JniWrapper.get().executeSerializedPlanLocalFiles(demo, arrowArrayStream.memoryAddress()); + } + return Data.importArrayStream(this.allocator, arrowArrayStream); + } + } + + private ArrowReader getArrowReader(Object plan, Map<String, ArrowReader> namedTables) { + List<ArrowArrayStream> listStreamInput = new ArrayList<>(); + try ( + ArrowArrayStream streamOutput = ArrowArrayStream.allocateNew(this.allocator) + ) { + String[] mapTableToMemoryAddress = getMapTableToMemoryAddress(namedTables, listStreamInput); + if (plan instanceof String) { Review Comment: Can we avoid instanceof chains? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
