This is an automated email from the ASF dual-hosted git repository. ravindra pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push: new 7b0335f ARROW-5818: [Java][Gandiva] support varlen output vectors 7b0335f is described below commit 7b0335ffb8cae84e401ce62e5d6d199407a8c621 Author: Pindikura Ravindra <ravin...@dremio.com> AuthorDate: Tue Jul 2 18:46:28 2019 +0530 ARROW-5818: [Java][Gandiva] support varlen output vectors callback to java for resizing varlen vectors Author: Pindikura Ravindra <ravin...@dremio.com> Closes #4771 from pravindra/jvarlen and squashes the following commits: d9954e865 <Pindikura Ravindra> add check for null expander b710a7792 <Pindikura Ravindra> ARROW-5818: support varlen output vectors --- cpp/src/gandiva/jni/jni_common.cc | 106 +++++++++++++++++---- .../apache/arrow/gandiva/evaluator/JniWrapper.java | 3 +- .../apache/arrow/gandiva/evaluator/Projector.java | 22 ++++- .../arrow/gandiva/evaluator/VectorExpander.java | 69 ++++++++++++++ .../arrow/gandiva/evaluator/ProjectorTest.java | 2 - 5 files changed, 177 insertions(+), 25 deletions(-) diff --git a/cpp/src/gandiva/jni/jni_common.cc b/cpp/src/gandiva/jni/jni_common.cc index 2ff4bc9..eeaaca7 100644 --- a/cpp/src/gandiva/jni/jni_common.cc +++ b/cpp/src/gandiva/jni/jni_common.cc @@ -72,6 +72,11 @@ jclass configuration_builder_class_; // refs for self. static jclass gandiva_exception_; +static jclass vector_expander_class_; +static jclass vector_expander_ret_class_; +static jmethodID vector_expander_method_; +static jfieldID vector_expander_ret_address_; +static jfieldID vector_expander_ret_capacity_; // module maps gandiva::IdToModuleMap<std::shared_ptr<ProjectorHolder>> projector_modules_; @@ -91,10 +96,27 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) { jclass localExceptionClass = env->FindClass("org/apache/arrow/gandiva/exceptions/GandivaException"); gandiva_exception_ = (jclass)env->NewGlobalRef(localExceptionClass); + env->ExceptionDescribe(); env->DeleteLocalRef(localExceptionClass); - env->ExceptionDescribe(); + jclass local_expander_class = + env->FindClass("org/apache/arrow/gandiva/evaluator/VectorExpander"); + vector_expander_class_ = (jclass)env->NewGlobalRef(local_expander_class); + env->DeleteLocalRef(local_expander_class); + + vector_expander_method_ = env->GetMethodID( + vector_expander_class_, "expandOutputVectorAtIndex", + "(II)Lorg/apache/arrow/gandiva/evaluator/VectorExpander$ExpandResult;"); + jclass local_expander_ret_class = + env->FindClass("org/apache/arrow/gandiva/evaluator/VectorExpander$ExpandResult"); + vector_expander_ret_class_ = (jclass)env->NewGlobalRef(local_expander_ret_class); + env->DeleteLocalRef(local_expander_ret_class); + + vector_expander_ret_address_ = + env->GetFieldID(vector_expander_ret_class_, "address", "J"); + vector_expander_ret_capacity_ = + env->GetFieldID(vector_expander_ret_class_, "capacity", "I"); return JNI_VERSION; } @@ -103,6 +125,8 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) { vm->GetEnv(reinterpret_cast<void**>(&env), JNI_VERSION); env->DeleteGlobalRef(configuration_builder_class_); env->DeleteGlobalRef(gandiva_exception_); + env->DeleteGlobalRef(vector_expander_class_); + env->DeleteGlobalRef(vector_expander_ret_class_); } DataTypePtr ProtoTypeToTime32(const types::ExtGandivaType& ext_type) { @@ -637,27 +661,62 @@ err_out: /// class JavaResizableBuffer : public arrow::ResizableBuffer { public: - JavaResizableBuffer(uint8_t* buffer, int32_t len) : ResizableBuffer(buffer, len) { + JavaResizableBuffer(JNIEnv* env, jobject jexpander, int32_t vector_idx, uint8_t* buffer, + int32_t len) + : ResizableBuffer(buffer, len), + env_(env), + jexpander_(jexpander), + vector_idx_(vector_idx) { size_ = 0; } - Status Resize(const int64_t new_size, bool shrink_to_fit) override { - if (shrink_to_fit == true) { - return Status::NotImplemented("shrink not implemented"); - } else if (new_size < capacity()) { - size_ = new_size; - return Status::OK(); - } else { - // TODO: callback into java to re-alloc the buffer. - return Status::NotImplemented("buffer expand not implemented"); - } - } + Status Resize(const int64_t new_size, bool shrink_to_fit) override; Status Reserve(const int64_t new_capacity) override { return Status::NotImplemented("reserve not implemented"); } + + private: + JNIEnv* env_; + jobject jexpander_; + int32_t vector_idx_; }; +Status JavaResizableBuffer::Resize(const int64_t new_size, bool shrink_to_fit) { + if (shrink_to_fit == true) { + return Status::NotImplemented("shrink not implemented"); + } + + if (ARROW_PREDICT_TRUE(new_size < capacity())) { + // no need to expand. + size_ = new_size; + return Status::OK(); + } + + if (new_size > INT32_MAX) { + return Status::OutOfMemory("java supports buffer sizes upto 2GB only"); + } + + // callback into java to expand the buffer + int32_t updated_capacity = static_cast<int32_t>(new_size); + jobject ret = env_->CallObjectMethod(jexpander_, vector_expander_method_, vector_idx_, + updated_capacity); + if (env_->ExceptionCheck()) { + env_->ExceptionDescribe(); + env_->ExceptionClear(); + return Status::OutOfMemory("buffer expand failed in java"); + } + + jlong ret_address = env_->GetLongField(ret, vector_expander_ret_address_); + jint ret_capacity = env_->GetIntField(ret, vector_expander_ret_capacity_); + DCHECK_GE(ret_capacity, updated_capacity); + + data_ = mutable_data_ = reinterpret_cast<uint8_t*>(ret_address); + size_ = new_size; + capacity_ = ret_capacity; + return Status::OK(); +} + #define CHECK_OUT_BUFFER_IDX_AND_BREAK(idx, len) \ if (idx >= len) { \ status = gandiva::Status::Invalid("insufficient number of out_buf_addrs"); \ @@ -666,9 +725,10 @@ class JavaResizableBuffer : public arrow::ResizableBuffer { JNIEXPORT void JNICALL Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( - JNIEnv* env, jobject cls, jlong module_id, jint num_rows, jlongArray buf_addrs, - jlongArray buf_sizes, jint sel_vec_type, jint sel_vec_rows, jlong sel_vec_addr, - jlong sel_vec_size, jlongArray out_buf_addrs, jlongArray out_buf_sizes) { + JNIEnv* env, jobject object, jobject jexpander, jlong module_id, jint num_rows, + jlongArray buf_addrs, jlongArray buf_sizes, jint sel_vec_type, jint sel_vec_rows, + jlong sel_vec_addr, jlong sel_vec_size, jlongArray out_buf_addrs, + jlongArray out_buf_sizes) { Status status; std::shared_ptr<ProjectorHolder> holder = projector_modules_.Lookup(module_id); if (holder == nullptr) { @@ -735,6 +795,7 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( ArrayDataVector output; int buf_idx = 0; int sz_idx = 0; + int output_vector_idx = 0; for (FieldPtr field : ret_types) { std::vector<std::shared_ptr<arrow::Buffer>> buffers; @@ -755,13 +816,24 @@ Java_org_apache_arrow_gandiva_evaluator_JniWrapper_evaluateProjector( uint8_t* value_buf = reinterpret_cast<uint8_t*>(out_bufs[buf_idx++]); jlong data_sz = out_sizes[sz_idx++]; if (arrow::is_binary_like(field->type()->id())) { - buffers.push_back(std::make_shared<JavaResizableBuffer>(value_buf, data_sz)); + if (jexpander == nullptr) { + status = Status::Invalid( + "expression has variable len output columns, but the expander object is " + "null"); + break; + } + buffers.push_back(std::make_shared<JavaResizableBuffer>( + env, jexpander, output_vector_idx, value_buf, data_sz)); } else { buffers.push_back(std::make_shared<arrow::MutableBuffer>(value_buf, data_sz)); } auto array_data = arrow::ArrayData::Make(field->type(), output_row_count, buffers); output.push_back(array_data); + ++output_vector_idx; + } + if (!status.ok()) { + break; } status = holder->projector()->Evaluate(*in_batch, selection_vector.get(), output); } while (0); diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java index ef1d63a..520ef5f 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/JniWrapper.java @@ -48,6 +48,7 @@ public class JniWrapper { * Evaluate the expressions represented by the moduleId on a record batch * and store the output in ValueVectors. Throws an exception in case of errors * + * @param expander VectorExpander object. Used for callbacks from cpp. * @param moduleId moduleId representing expressions. Created using a call to * buildNativeCode * @param numRows Number of rows in the record batch @@ -61,7 +62,7 @@ public class JniWrapper { * @param outSizes The allocated size of the output buffers. On successful evaluation, * the result is stored in the output buffers */ - native void evaluateProjector(long moduleId, int numRows, + native void evaluateProjector(Object expander, long moduleId, int numRows, long[] bufAddrs, long[] bufSizes, int selectionVectorType, int selectionVectorSize, long selectionVectorBufferAddr, long selectionVectorBufferSize, diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java index 93657e6..c15d474 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/Projector.java @@ -27,6 +27,7 @@ import org.apache.arrow.gandiva.expression.ArrowTypeHelper; import org.apache.arrow.gandiva.expression.ExpressionTree; import org.apache.arrow.gandiva.ipc.GandivaTypes; import org.apache.arrow.gandiva.ipc.GandivaTypes.SelectionVectorType; +import org.apache.arrow.vector.BaseVariableWidthVector; import org.apache.arrow.vector.FixedWidthVector; import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VariableWidthVector; @@ -236,14 +237,18 @@ public class Projector { bufSizes[idx++] = bufLayout.getSize(); } + boolean hasVariableWidthColumns = false; + BaseVariableWidthVector[] resizableVectors = new BaseVariableWidthVector[outColumns.size()]; long[] outAddrs = new long[3 * outColumns.size()]; long[] outSizes = new long[3 * outColumns.size()]; idx = 0; + int outColumnIdx = 0; for (ValueVector valueVector : outColumns) { boolean isFixedWith = valueVector instanceof FixedWidthVector; boolean isVarWidth = valueVector instanceof VariableWidthVector; if (!isFixedWith && !isVarWidth) { - throw new UnsupportedTypeException("Unsupported value vector type " + valueVector.getField().getFieldType()); + throw new UnsupportedTypeException( + "Unsupported value vector type " + valueVector.getField().getFieldType()); } outAddrs[idx] = valueVector.getValidityBuffer().memoryAddress(); @@ -251,17 +256,24 @@ public class Projector { if (isVarWidth) { outAddrs[idx] = valueVector.getOffsetBuffer().memoryAddress(); outSizes[idx++] = valueVector.getOffsetBuffer().capacity(); + hasVariableWidthColumns = true; + + // save vector to allow for resizing. + resizableVectors[outColumnIdx] = (BaseVariableWidthVector)valueVector; } outAddrs[idx] = valueVector.getDataBuffer().memoryAddress(); outSizes[idx++] = valueVector.getDataBuffer().capacity(); valueVector.setValueCount(selectionVectorRecordCount); + outColumnIdx++; } - wrapper.evaluateProjector(this.moduleId, numRows, bufAddrs, bufSizes, - selectionVectorType, selectionVectorRecordCount, - selectionVectorAddr, selectionVectorSize, - outAddrs, outSizes); + wrapper.evaluateProjector( + hasVariableWidthColumns ? new VectorExpander(resizableVectors) : null, + this.moduleId, numRows, bufAddrs, bufSizes, + selectionVectorType, selectionVectorRecordCount, + selectionVectorAddr, selectionVectorSize, + outAddrs, outSizes); } /** diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/VectorExpander.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/VectorExpander.java new file mode 100644 index 0000000..2414144 --- /dev/null +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/evaluator/VectorExpander.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.gandiva.evaluator; + +import org.apache.arrow.vector.BaseVariableWidthVector; + +/** + * This class provides the functionality to expand output vectors using a callback mechanism from + * gandiva. + */ +public class VectorExpander { + private final BaseVariableWidthVector[] vectors; + + public VectorExpander(BaseVariableWidthVector[] vectors) { + this.vectors = vectors; + } + + /** + * Result of vector expansion. + */ + public static class ExpandResult { + public long address; + public int capacity; + + public ExpandResult(long address, int capacity) { + this.address = address; + this.capacity = capacity; + } + } + + /** + * Expand vector at specified index. This is used as a back call from jni, and is only + * relevant for variable width vectors. + * + * @param index index of buffer in the list passed to jni. + * @param toCapacity the size to which the buffer should be expanded to. + * + * @return address and size of the buffer after expansion. + */ + public ExpandResult expandOutputVectorAtIndex(int index, int toCapacity) { + if (index >= vectors.length || vectors[index] == null) { + throw new IllegalArgumentException("invalid index " + index); + } + + BaseVariableWidthVector vector = vectors[index]; + while (vector.getDataBuffer().capacity() < toCapacity) { + vector.reallocDataBuffer(); + } + return new ExpandResult( + vector.getDataBuffer().memoryAddress(), + vector.getDataBuffer().capacity()); + } + +} diff --git a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java index 2fd8091..52eeb16 100644 --- a/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java +++ b/java/gandiva/src/test/java/org/apache/arrow/gandiva/evaluator/ProjectorTest.java @@ -580,8 +580,6 @@ public class ProjectorTest extends BaseEvaluatorTest { // test with insufficient data buffer. try { outVector.allocateNew(4, numRows); - thrown.expect(GandivaException.class); - thrown.expectMessage("expand not implemented"); eval.evaluate(batch, output); } finally { releaseRecordBatch(batch);