This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 5c45fdc9 remove dead code (#1155)
5c45fdc9 is described below
commit 5c45fdc9e311da07a0f3510d9b8aec673081861a
Author: Andy Grove <[email protected]>
AuthorDate: Mon Dec 9 17:45:27 2024 -0700
remove dead code (#1155)
---
native/core/benches/bloom_filter_agg.rs | 2 -
native/core/benches/parquet_read.rs | 5 +-
native/core/src/errors.rs | 44 +-------
.../datafusion/expressions/bloom_filter_agg.rs | 6 --
native/core/src/execution/datafusion/planner.rs | 3 -
.../execution/datafusion/util/spark_bit_array.rs | 1 +
native/core/src/execution/jni_api.rs | 8 --
native/core/src/execution/kernels/strings.rs | 117 +--------------------
native/core/src/execution/operators/scan.rs | 8 --
native/core/src/execution/shuffle/list.rs | 4 +-
native/core/src/execution/shuffle/map.rs | 17 +--
native/core/src/execution/shuffle/row.rs | 5 +-
native/core/src/execution/utils.rs | 18 ----
native/core/src/jvm_bridge/batch_iterator.rs | 1 +
native/core/src/jvm_bridge/comet_metric_node.rs | 1 +
.../src/jvm_bridge/comet_task_memory_manager.rs | 1 +
native/core/src/jvm_bridge/mod.rs | 1 +
native/core/src/lib.rs | 1 -
native/core/src/parquet/mod.rs | 2 -
native/core/src/parquet/mutable_vector.rs | 8 --
native/core/src/parquet/read/column.rs | 14 +--
native/core/src/parquet/read/mod.rs | 3 -
native/core/src/parquet/read/values.rs | 26 ++---
.../core/src/parquet/util/test_common/page_util.rs | 12 +--
24 files changed, 27 insertions(+), 281 deletions(-)
diff --git a/native/core/benches/bloom_filter_agg.rs
b/native/core/benches/bloom_filter_agg.rs
index af3eb919..25d27d17 100644
--- a/native/core/benches/bloom_filter_agg.rs
+++ b/native/core/benches/bloom_filter_agg.rs
@@ -61,10 +61,8 @@ fn criterion_benchmark(c: &mut Criterion) {
group.bench_function(agg_mode.0, |b| {
let comet_bloom_filter_agg =
Arc::new(AggregateUDF::new_from_impl(BloomFilterAgg::new(
- Arc::clone(&c0),
Arc::clone(&num_items),
Arc::clone(&num_bits),
- "bloom_filter_agg",
DataType::Binary,
)));
b.to_async(&rt).iter(|| {
diff --git a/native/core/benches/parquet_read.rs
b/native/core/benches/parquet_read.rs
index 1f8178cd..ae511ade 100644
--- a/native/core/benches/parquet_read.rs
+++ b/native/core/benches/parquet_read.rs
@@ -44,7 +44,7 @@ fn bench(c: &mut Criterion) {
let mut group = c.benchmark_group("comet_parquet_read");
let schema = build_test_schema();
- let pages = build_plain_int32_pages(schema.clone(), schema.column(0), 0.0);
+ let pages = build_plain_int32_pages(schema.column(0), 0.0);
group.bench_function("INT/PLAIN/NOT_NULL", |b| {
let t = TypePtr::new(
PrimitiveTypeBuilder::new("f", PhysicalType::INT32)
@@ -107,7 +107,6 @@ const VALUES_PER_PAGE: usize = 10_000;
const BATCH_SIZE: usize = 4096;
fn build_plain_int32_pages(
- schema: SchemaDescPtr,
column_desc: ColumnDescPtr,
null_density: f32,
) -> impl PageIterator + Clone {
@@ -143,7 +142,7 @@ fn build_plain_int32_pages(
// Since `InMemoryPageReader` is not exposed from parquet crate, here we
use
// `InMemoryPageIterator` instead which is a Iter<Iter<Page>>.
- InMemoryPageIterator::new(schema, column_desc, vec![pages])
+ InMemoryPageIterator::new(vec![pages])
}
struct TestColumnReader {
diff --git a/native/core/src/errors.rs b/native/core/src/errors.rs
index 92799bcf..4d623d97 100644
--- a/native/core/src/errors.rs
+++ b/native/core/src/errors.rs
@@ -485,23 +485,6 @@ where
|| f(t)
}
-// This is a duplicate of `try_unwrap_or_throw`, which is used to work around
Arrow's lack of
-// `UnwindSafe` handling.
-pub fn try_assert_unwind_safe_or_throw<T, F>(env: &JNIEnv, f: F) -> T
-where
- T: JNIDefault,
- F: FnOnce(JNIEnv) -> Result<T, CometError>,
-{
- let mut env1 = unsafe { JNIEnv::from_raw(env.get_raw()).unwrap() };
- let env2 = unsafe { JNIEnv::from_raw(env.get_raw()).unwrap() };
- unwrap_or_throw_default(
- &mut env1,
- flatten(
- catch_unwind(std::panic::AssertUnwindSafe(curry(f,
env2))).map_err(CometError::from),
- ),
- )
-}
-
// It is currently undefined behavior to unwind from Rust code into foreign
code, so we can wrap
// our JNI functions and turn these panics into a `RuntimeException`.
pub fn try_unwrap_or_throw<T, F>(env: &JNIEnv, f: F) -> T
@@ -534,10 +517,7 @@ mod tests {
AttachGuard, InitArgsBuilder, JNIEnv, JNIVersion, JavaVM,
};
- use assertables::{
- assert_contains, assert_contains_as_result, assert_starts_with,
- assert_starts_with_as_result,
- };
+ use assertables::{assert_starts_with, assert_starts_with_as_result};
pub fn jvm() -> &'static Arc<JavaVM> {
static mut JVM: Option<Arc<JavaVM>> = None;
@@ -890,26 +870,4 @@ mod tests {
// first line.
assert_starts_with!(msg_rust, expected_message);
}
-
- // Asserts that exception's message matches `expected_message`.
- fn assert_exception_message_with_stacktrace(
- env: &mut JNIEnv,
- exception: JThrowable,
- expected_message: &str,
- stacktrace_contains: &str,
- ) {
- let message = env
- .call_method(exception, "getMessage", "()Ljava/lang/String;", &[])
- .unwrap()
- .l()
- .unwrap();
- let message_string = message.into();
- let msg_rust: String = env.get_string(&message_string).unwrap().into();
- // Since panics result in multi-line messages which include the
backtrace, just use the
- // first line.
- assert_starts_with!(msg_rust, expected_message);
-
- // Check that the stacktrace is included by checking for a specific
element
- assert_contains!(msg_rust, stacktrace_contains);
- }
}
diff --git
a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs
b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs
index e6528a56..1300e08c 100644
--- a/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs
+++ b/native/core/src/execution/datafusion/expressions/bloom_filter_agg.rs
@@ -34,9 +34,7 @@ use datafusion_physical_expr::expressions::Literal;
#[derive(Debug, Clone)]
pub struct BloomFilterAgg {
- name: String,
signature: Signature,
- expr: Arc<dyn PhysicalExpr>,
num_items: i32,
num_bits: i32,
}
@@ -53,15 +51,12 @@ fn extract_i32_from_literal(expr: Arc<dyn PhysicalExpr>) ->
i32 {
impl BloomFilterAgg {
pub fn new(
- expr: Arc<dyn PhysicalExpr>,
num_items: Arc<dyn PhysicalExpr>,
num_bits: Arc<dyn PhysicalExpr>,
- name: impl Into<String>,
data_type: DataType,
) -> Self {
assert!(matches!(data_type, DataType::Binary));
Self {
- name: name.into(),
signature: Signature::uniform(
1,
vec![
@@ -73,7 +68,6 @@ impl BloomFilterAgg {
],
Volatility::Immutable,
),
- expr,
num_items: extract_i32_from_literal(num_items),
num_bits: extract_i32_from_literal(num_bits),
}
diff --git a/native/core/src/execution/datafusion/planner.rs
b/native/core/src/execution/datafusion/planner.rs
index a83dba5d..5e77b3f6 100644
--- a/native/core/src/execution/datafusion/planner.rs
+++ b/native/core/src/execution/datafusion/planner.rs
@@ -115,7 +115,6 @@ use std::cmp::max;
use std::{collections::HashMap, sync::Arc};
// For clippy error on type_complexity.
-type ExecResult<T> = Result<T, ExecutionError>;
type PhyAggResult = Result<Vec<AggregateFunctionExpr>, ExecutionError>;
type PhyExprResult = Result<Vec<(Arc<dyn PhysicalExpr>, String)>,
ExecutionError>;
type PartitionPhyExprResult = Result<Vec<Arc<dyn PhysicalExpr>>,
ExecutionError>;
@@ -1758,10 +1757,8 @@ impl PhysicalPlanner {
self.create_expr(expr.num_bits.as_ref().unwrap(),
Arc::clone(&schema))?;
let datatype =
to_arrow_datatype(expr.datatype.as_ref().unwrap());
let func = AggregateUDF::new_from_impl(BloomFilterAgg::new(
- Arc::clone(&child),
Arc::clone(&num_items),
Arc::clone(&num_bits),
- "bloom_filter_agg",
datatype,
));
Self::create_aggr_func_expr("bloom_filter_agg", schema,
vec![child], func)
diff --git a/native/core/src/execution/datafusion/util/spark_bit_array.rs
b/native/core/src/execution/datafusion/util/spark_bit_array.rs
index 68b97d66..6cfecc1b 100644
--- a/native/core/src/execution/datafusion/util/spark_bit_array.rs
+++ b/native/core/src/execution/datafusion/util/spark_bit_array.rs
@@ -70,6 +70,7 @@ impl SparkBitArray {
self.data.len()
}
+ #[allow(dead_code)] // this is only called from tests
pub fn cardinality(&self) -> usize {
self.bit_count
}
diff --git a/native/core/src/execution/jni_api.rs
b/native/core/src/execution/jni_api.rs
index 8afe134c..5103f5ce 100644
--- a/native/core/src/execution/jni_api.rs
+++ b/native/core/src/execution/jni_api.rs
@@ -207,14 +207,6 @@ fn prepare_datafusion_session_context(
Ok(session_ctx)
}
-fn parse_bool(conf: &HashMap<String, String>, name: &str) -> CometResult<bool>
{
- conf.get(name)
- .map(String::as_str)
- .unwrap_or("false")
- .parse::<bool>()
- .map_err(|e| CometError::Config(format!("Failed to parse boolean
config {name}: {e}")))
-}
-
/// Prepares arrow arrays for output.
fn prepare_output(
env: &mut JNIEnv,
diff --git a/native/core/src/execution/kernels/strings.rs
b/native/core/src/execution/kernels/strings.rs
index 2e5e67b6..d63b2c47 100644
--- a/native/core/src/execution/kernels/strings.rs
+++ b/native/core/src/execution/kernels/strings.rs
@@ -21,7 +21,7 @@ use std::sync::Arc;
use arrow::{
array::*,
- buffer::{Buffer, MutableBuffer},
+ buffer::MutableBuffer,
compute::kernels::substring::{substring as arrow_substring,
substring_by_char},
datatypes::{DataType, Int32Type},
};
@@ -87,43 +87,6 @@ pub fn substring(array: &dyn Array, start: i64, length: u64)
-> Result<ArrayRef,
}
}
-/// Returns an ArrayRef with a substring starting from `start` and length.
-///
-/// # Preconditions
-///
-/// - `start` can be negative, in which case the start counts from the end of
the string.
-/// - `array` must be either [`StringArray`] or [`LargeStringArray`].
-///
-/// Note: this is different from arrow-rs `substring` kernel in that both
`start` and `length` are
-/// `Int32Array` here.
-pub fn substring_with_array(
- array: &dyn Array,
- start: &Int32Array,
- length: &Int32Array,
-) -> ArrayRef {
- match array.data_type() {
- DataType::LargeUtf8 => generic_substring(
- array
- .as_any()
- .downcast_ref::<LargeStringArray>()
- .expect("A large string is expected"),
- start,
- length,
- |i| i as i64,
- ),
- DataType::Utf8 => generic_substring(
- array
- .as_any()
- .downcast_ref::<StringArray>()
- .expect("A string is expected"),
- start,
- length,
- |i| i,
- ),
- _ => panic!("substring does not support type {:?}", array.data_type()),
- }
-}
-
fn generic_string_space<OffsetSize: OffsetSizeTrait>(length: &Int32Array) ->
ArrayRef {
let array_len = length.len();
let mut offsets = MutableBuffer::new((array_len + 1) *
std::mem::size_of::<OffsetSize>());
@@ -163,81 +126,3 @@ fn generic_string_space<OffsetSize:
OffsetSizeTrait>(length: &Int32Array) -> Arr
};
make_array(data)
}
-
-fn generic_substring<OffsetSize: OffsetSizeTrait, F>(
- array: &GenericStringArray<OffsetSize>,
- start: &Int32Array,
- length: &Int32Array,
- f: F,
-) -> ArrayRef
-where
- F: Fn(i32) -> OffsetSize,
-{
- assert_eq!(array.len(), start.len());
- assert_eq!(array.len(), length.len());
-
- // compute current offsets
- let offsets = array.to_data().buffers()[0].clone();
- let offsets: &[OffsetSize] = offsets.typed_data::<OffsetSize>();
-
- // compute null bitmap (copy)
- let null_bit_buffer = array.to_data().nulls().map(|b| b.buffer().clone());
-
- // Gets slices of start and length arrays to access them directly for
performance.
- let start_data = start.to_data();
- let length_data = length.to_data();
- let starts = start_data.buffers()[0].typed_data::<i32>();
- let lengths = length_data.buffers()[0].typed_data::<i32>();
-
- // compute values
- let array_data = array.to_data();
- let values = &array_data.buffers()[1];
- let data = values.as_slice();
-
- // we have no way to estimate how much this will be.
- let mut new_values = MutableBuffer::new(0);
- let mut new_offsets: Vec<OffsetSize> = Vec::with_capacity(array.len() + 1);
-
- let mut length_so_far = OffsetSize::zero();
- new_offsets.push(length_so_far);
- (0..array.len()).for_each(|i| {
- // the length of this entry
- let length_i: OffsetSize = offsets[i + 1] - offsets[i];
- // compute where we should start slicing this entry
- let start_pos: OffsetSize = f(starts[i]);
-
- let start = offsets[i]
- + if start_pos >= OffsetSize::zero() {
- start_pos
- } else {
- length_i + start_pos
- };
-
- let start = start.clamp(offsets[i], offsets[i + 1]);
- // compute the length of the slice
- let slice_length: OffsetSize = f(lengths[i].max(0)).min(offsets[i + 1]
- start);
-
- length_so_far += slice_length;
-
- new_offsets.push(length_so_far);
-
- // we need usize for ranges
- let start = start.to_usize().unwrap();
- let slice_length = slice_length.to_usize().unwrap();
-
- new_values.extend_from_slice(&data[start..start + slice_length]);
- });
-
- let data = unsafe {
- ArrayData::new_unchecked(
- GenericStringArray::<OffsetSize>::DATA_TYPE,
- array.len(),
- None,
- null_bit_buffer,
- 0,
- vec![Buffer::from_slice_ref(&new_offsets), new_values.into()],
- vec![],
- )
- };
- make_array(data)
-}
diff --git a/native/core/src/execution/operators/scan.rs
b/native/core/src/execution/operators/scan.rs
index a97caf0d..0d35859d 100644
--- a/native/core/src/execution/operators/scan.rs
+++ b/native/core/src/execution/operators/scan.rs
@@ -525,12 +525,4 @@ impl InputBatch {
InputBatch::Batch(columns, num_rows)
}
-
- /// Get the number of rows in this batch
- fn num_rows(&self) -> usize {
- match self {
- Self::EOF => 0,
- Self::Batch(_, num_rows) => *num_rows,
- }
- }
}
diff --git a/native/core/src/execution/shuffle/list.rs
b/native/core/src/execution/shuffle/list.rs
index d8bdcb19..0f7f3e8c 100644
--- a/native/core/src/execution/shuffle/list.rs
+++ b/native/core/src/execution/shuffle/list.rs
@@ -28,7 +28,6 @@ use arrow_schema::{DataType, TimeUnit};
pub struct SparkUnsafeArray {
row_addr: i64,
- row_size: i32,
num_elements: usize,
element_offset: i64,
}
@@ -45,7 +44,7 @@ impl SparkUnsafeObject for SparkUnsafeArray {
impl SparkUnsafeArray {
/// Creates a `SparkUnsafeArray` which points to the given address and
size in bytes.
- pub fn new(addr: i64, size: i32) -> Self {
+ pub fn new(addr: i64) -> Self {
// Read the number of elements from the first 8 bytes.
let slice: &[u8] = unsafe { std::slice::from_raw_parts(addr as *const
u8, 8) };
let num_elements = i64::from_le_bytes(slice.try_into().unwrap());
@@ -60,7 +59,6 @@ impl SparkUnsafeArray {
Self {
row_addr: addr,
- row_size: size,
num_elements: num_elements as usize,
element_offset: addr +
Self::get_header_portion_in_bytes(num_elements),
}
diff --git a/native/core/src/execution/shuffle/map.rs
b/native/core/src/execution/shuffle/map.rs
index 01469529..0969168f 100644
--- a/native/core/src/execution/shuffle/map.rs
+++ b/native/core/src/execution/shuffle/map.rs
@@ -30,8 +30,6 @@ use arrow_array::builder::{
use arrow_schema::{DataType, FieldRef, Fields, TimeUnit};
pub struct SparkUnsafeMap {
- row_addr: i64,
- row_size: i32,
pub(crate) keys: SparkUnsafeArray,
pub(crate) values: SparkUnsafeArray,
}
@@ -59,8 +57,8 @@ impl SparkUnsafeMap {
panic!("Negative value size in bytes of map: {}",
value_array_size);
}
- let keys = SparkUnsafeArray::new(addr + 8, key_array_size as i32);
- let values = SparkUnsafeArray::new(addr + 8 + key_array_size,
value_array_size);
+ let keys = SparkUnsafeArray::new(addr + 8);
+ let values = SparkUnsafeArray::new(addr + 8 + key_array_size);
if keys.get_num_elements() != values.get_num_elements() {
panic!(
@@ -70,16 +68,7 @@ impl SparkUnsafeMap {
);
}
- Self {
- row_addr: addr,
- row_size: size,
- keys,
- values,
- }
- }
-
- pub(crate) fn get_num_elements(&self) -> usize {
- self.keys.get_num_elements()
+ Self { keys, values }
}
}
diff --git a/native/core/src/execution/shuffle/row.rs
b/native/core/src/execution/shuffle/row.rs
index 2aeb4881..17b180e9 100644
--- a/native/core/src/execution/shuffle/row.rs
+++ b/native/core/src/execution/shuffle/row.rs
@@ -48,7 +48,6 @@ use std::{
sync::Arc,
};
-const WORD_SIZE: i64 = 8;
const MAX_LONG_DIGITS: u8 = 18;
const NESTED_TYPE_BUILDER_CAPACITY: usize = 100;
@@ -170,8 +169,8 @@ pub trait SparkUnsafeObject {
/// Returns array value at the given index of the object.
fn get_array(&self, index: usize) -> SparkUnsafeArray {
- let (offset, len) = self.get_offset_and_len(index);
- SparkUnsafeArray::new(self.get_row_addr() + offset as i64, len)
+ let (offset, _) = self.get_offset_and_len(index);
+ SparkUnsafeArray::new(self.get_row_addr() + offset as i64)
}
fn get_map(&self, index: usize) -> SparkUnsafeMap {
diff --git a/native/core/src/execution/utils.rs
b/native/core/src/execution/utils.rs
index 553d4260..4992b7ba 100644
--- a/native/core/src/execution/utils.rs
+++ b/native/core/src/execution/utils.rs
@@ -15,8 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use std::sync::Arc;
-
use arrow::{
array::ArrayData,
error::ArrowError,
@@ -52,10 +50,6 @@ pub trait SparkArrowConvert {
where
Self: Sized;
- /// Convert Arrow Arrays to C data interface.
- /// It returns a tuple (ArrowArray address, ArrowSchema address).
- fn to_spark(&self) -> Result<(i64, i64), ExecutionError>;
-
/// Move Arrow Arrays to C data interface.
fn move_to_spark(&self, array: i64, schema: i64) -> Result<(),
ExecutionError>;
}
@@ -88,18 +82,6 @@ impl SparkArrowConvert for ArrayData {
Ok(ffi_array)
}
- /// Converts this ArrowData to pointers of Arrow C data interface.
- /// Returned pointers are Arc-ed and should be freed manually.
- #[allow(clippy::arc_with_non_send_sync)]
- fn to_spark(&self) -> Result<(i64, i64), ExecutionError> {
- let arrow_array = Arc::new(FFI_ArrowArray::new(self));
- let arrow_schema =
Arc::new(FFI_ArrowSchema::try_from(self.data_type())?);
-
- let (array, schema) = (Arc::into_raw(arrow_array),
Arc::into_raw(arrow_schema));
-
- Ok((array as i64, schema as i64))
- }
-
/// Move this ArrowData to pointers of Arrow C data interface.
fn move_to_spark(&self, array: i64, schema: i64) -> Result<(),
ExecutionError> {
let array_ptr = array as *mut FFI_ArrowArray;
diff --git a/native/core/src/jvm_bridge/batch_iterator.rs
b/native/core/src/jvm_bridge/batch_iterator.rs
index 45b10cf2..998e540c 100644
--- a/native/core/src/jvm_bridge/batch_iterator.rs
+++ b/native/core/src/jvm_bridge/batch_iterator.rs
@@ -24,6 +24,7 @@ use jni::{
};
/// A struct that holds all the JNI methods and fields for JVM
`CometBatchIterator` class.
+#[allow(dead_code)] // we need to keep references to Java items to prevent GC
pub struct CometBatchIterator<'a> {
pub class: JClass<'a>,
pub method_has_next: JMethodID,
diff --git a/native/core/src/jvm_bridge/comet_metric_node.rs
b/native/core/src/jvm_bridge/comet_metric_node.rs
index 8647e071..85386d9b 100644
--- a/native/core/src/jvm_bridge/comet_metric_node.rs
+++ b/native/core/src/jvm_bridge/comet_metric_node.rs
@@ -23,6 +23,7 @@ use jni::{
};
/// A struct that holds all the JNI methods and fields for JVM CometMetricNode
class.
+#[allow(dead_code)] // we need to keep references to Java items to prevent GC
pub struct CometMetricNode<'a> {
pub class: JClass<'a>,
pub method_get_child_node: JMethodID,
diff --git a/native/core/src/jvm_bridge/comet_task_memory_manager.rs
b/native/core/src/jvm_bridge/comet_task_memory_manager.rs
index 97d1bf3a..22c3332c 100644
--- a/native/core/src/jvm_bridge/comet_task_memory_manager.rs
+++ b/native/core/src/jvm_bridge/comet_task_memory_manager.rs
@@ -25,6 +25,7 @@ use jni::{
/// A wrapper which delegate acquire/release memory calls to the
/// JVM side `CometTaskMemoryManager`.
#[derive(Debug)]
+#[allow(dead_code)] // we need to keep references to Java items to prevent GC
pub struct CometTaskMemoryManager<'a> {
pub class: JClass<'a>,
pub method_acquire_memory: JMethodID,
diff --git a/native/core/src/jvm_bridge/mod.rs
b/native/core/src/jvm_bridge/mod.rs
index 4936b1c5..5fc0a55e 100644
--- a/native/core/src/jvm_bridge/mod.rs
+++ b/native/core/src/jvm_bridge/mod.rs
@@ -189,6 +189,7 @@ pub use comet_metric_node::*;
pub use comet_task_memory_manager::*;
/// The JVM classes that are used in the JNI calls.
+#[allow(dead_code)] // we need to keep references to Java items to prevent GC
pub struct JVMClasses<'a> {
/// Cached JClass for "java.lang.Object"
java_lang_object: JClass<'a>,
diff --git a/native/core/src/lib.rs b/native/core/src/lib.rs
index 68c8ae72..cab511fa 100644
--- a/native/core/src/lib.rs
+++ b/native/core/src/lib.rs
@@ -17,7 +17,6 @@
#![allow(incomplete_features)]
#![allow(non_camel_case_types)]
-#![allow(dead_code)]
#![allow(clippy::upper_case_acronyms)]
// For prost generated struct
#![allow(clippy::derive_partial_eq_without_eq)]
diff --git a/native/core/src/parquet/mod.rs b/native/core/src/parquet/mod.rs
index 455f1992..d2a6f480 100644
--- a/native/core/src/parquet/mod.rs
+++ b/native/core/src/parquet/mod.rs
@@ -47,8 +47,6 @@ use util::jni::{convert_column_descriptor, convert_encoding};
use self::util::jni::TypePromotionInfo;
-const STR_CLASS_NAME: &str = "java/lang/String";
-
/// Parquet read context maintained across multiple JNI calls.
struct Context {
pub column_reader: ColumnReader,
diff --git a/native/core/src/parquet/mutable_vector.rs
b/native/core/src/parquet/mutable_vector.rs
index 7f30d7d8..d19ea32f 100644
--- a/native/core/src/parquet/mutable_vector.rs
+++ b/native/core/src/parquet/mutable_vector.rs
@@ -40,12 +40,6 @@ pub struct ParquetMutableVector {
/// The number of null elements in this vector, must <= `num_values`.
pub(crate) num_nulls: usize,
- /// The capacity of the vector
- pub(crate) capacity: usize,
-
- /// How many bits are required to store a single value
- pub(crate) bit_width: usize,
-
/// The validity buffer of this Arrow vector. A bit set at position `i`
indicates the `i`th
/// element is not null. Otherwise, an unset bit at position `i` indicates
the `i`th element is
/// null.
@@ -109,8 +103,6 @@ impl ParquetMutableVector {
arrow_type,
num_values: 0,
num_nulls: 0,
- capacity,
- bit_width,
validity_buffer,
value_buffer,
children,
diff --git a/native/core/src/parquet/read/column.rs
b/native/core/src/parquet/read/column.rs
index 73f8df95..05a0bf7b 100644
--- a/native/core/src/parquet/read/column.rs
+++ b/native/core/src/parquet/read/column.rs
@@ -770,7 +770,7 @@ impl<T: DataType> TypedColumnReader<T> {
// Create a new vector for dictionary values
let mut value_vector = ParquetMutableVector::new(page_value_count,
&self.arrow_type);
- let mut dictionary = self.get_decoder(page_data, page_value_count,
encoding);
+ let mut dictionary = self.get_decoder(page_data, encoding);
dictionary.read_batch(&mut value_vector, page_value_count);
value_vector.num_values = page_value_count;
@@ -812,7 +812,7 @@ impl<T: DataType> TypedColumnReader<T> {
self.def_level_decoder = Some(dl_decoder);
page_buffer = page_buffer.slice(offset);
- let value_decoder = self.get_decoder(page_buffer, page_value_count,
encoding);
+ let value_decoder = self.get_decoder(page_buffer, encoding);
self.value_decoder = Some(value_decoder);
}
@@ -838,7 +838,7 @@ impl<T: DataType> TypedColumnReader<T> {
dl_decoder.set_data(page_value_count, &def_level_data);
self.def_level_decoder = Some(dl_decoder);
- let value_decoder = self.get_decoder(value_data, page_value_count,
encoding);
+ let value_decoder = self.get_decoder(value_data, encoding);
self.value_decoder = Some(value_decoder);
}
@@ -977,15 +977,9 @@ impl<T: DataType> TypedColumnReader<T> {
}
}
- fn get_decoder(
- &self,
- value_data: Buffer,
- page_value_count: usize,
- encoding: Encoding,
- ) -> Box<dyn Decoder> {
+ fn get_decoder(&self, value_data: Buffer, encoding: Encoding) -> Box<dyn
Decoder> {
get_decoder::<T>(
value_data,
- page_value_count,
encoding,
Arc::clone(&self.desc),
self.read_options,
diff --git a/native/core/src/parquet/read/mod.rs
b/native/core/src/parquet/read/mod.rs
index 4d057a06..5a55f211 100644
--- a/native/core/src/parquet/read/mod.rs
+++ b/native/core/src/parquet/read/mod.rs
@@ -44,9 +44,6 @@ pub struct PlainDecoderInner {
/// The current offset in `data`, in bytes.
offset: usize,
- /// The number of total values in `data`
- value_count: usize,
-
/// Reads `data` bit by bit, used if `T` is [`BoolType`].
bit_reader: BitReader,
diff --git a/native/core/src/parquet/read/values.rs
b/native/core/src/parquet/read/values.rs
index 71cd035d..e28d695e 100644
--- a/native/core/src/parquet/read/values.rs
+++ b/native/core/src/parquet/read/values.rs
@@ -34,20 +34,16 @@ use datafusion_comet_spark_expr::utils::unlikely;
pub fn get_decoder<T: DataType>(
value_data: Buffer,
- num_values: usize,
encoding: Encoding,
desc: ColumnDescPtr,
read_options: ReadOptions,
) -> Box<dyn Decoder> {
let decoder: Box<dyn Decoder> = match encoding {
- Encoding::PLAIN | Encoding::PLAIN_DICTIONARY =>
Box::new(PlainDecoder::<T>::new(
- value_data,
- num_values,
- desc,
- read_options,
- )),
+ Encoding::PLAIN | Encoding::PLAIN_DICTIONARY => {
+ Box::new(PlainDecoder::<T>::new(value_data, desc, read_options))
+ }
// This is for dictionary indices
- Encoding::RLE_DICTIONARY => Box::new(DictDecoder::new(value_data,
num_values)),
+ Encoding::RLE_DICTIONARY => Box::new(DictDecoder::new(value_data)),
_ => panic!("Unsupported encoding: {}", encoding),
};
decoder
@@ -108,17 +104,11 @@ pub struct PlainDecoder<T: DataType> {
}
impl<T: DataType> PlainDecoder<T> {
- pub fn new(
- value_data: Buffer,
- num_values: usize,
- desc: ColumnDescPtr,
- read_options: ReadOptions,
- ) -> Self {
+ pub fn new(value_data: Buffer, desc: ColumnDescPtr, read_options:
ReadOptions) -> Self {
let len = value_data.len();
let inner = PlainDecoderInner {
data: value_data.clone(),
offset: 0,
- value_count: num_values,
bit_reader: BitReader::new(value_data, len),
read_options,
desc,
@@ -938,9 +928,6 @@ pub struct DictDecoder {
/// Number of bits used to represent dictionary indices. Must be between
`[0, 64]`.
bit_width: usize,
- /// The number of total values in `data`
- value_count: usize,
-
/// Bit reader
bit_reader: BitReader,
@@ -955,12 +942,11 @@ pub struct DictDecoder {
}
impl DictDecoder {
- pub fn new(buf: Buffer, num_values: usize) -> Self {
+ pub fn new(buf: Buffer) -> Self {
let bit_width = buf.as_bytes()[0] as usize;
Self {
bit_width,
- value_count: num_values,
bit_reader: BitReader::new_all(buf.slice(1)),
rle_left: 0,
bit_packed_left: 0,
diff --git a/native/core/src/parquet/util/test_common/page_util.rs
b/native/core/src/parquet/util/test_common/page_util.rs
index e20cc30c..333298bc 100644
--- a/native/core/src/parquet/util/test_common/page_util.rs
+++ b/native/core/src/parquet/util/test_common/page_util.rs
@@ -28,7 +28,7 @@ use parquet::{
levels::{max_buffer_size, LevelEncoder},
},
errors::Result,
- schema::types::{ColumnDescPtr, SchemaDescPtr},
+ schema::types::ColumnDescPtr,
};
use super::random_numbers_range;
@@ -201,20 +201,12 @@ impl<P: Iterator<Item = Page> + Send> Iterator for
InMemoryPageReader<P> {
/// A utility page iterator which stores page readers in memory, used for
tests.
#[derive(Clone)]
pub struct InMemoryPageIterator<I: Iterator<Item = Vec<Page>>> {
- schema: SchemaDescPtr,
- column_desc: ColumnDescPtr,
page_reader_iter: I,
}
impl<I: Iterator<Item = Vec<Page>>> InMemoryPageIterator<I> {
- pub fn new(
- schema: SchemaDescPtr,
- column_desc: ColumnDescPtr,
- pages: impl IntoIterator<Item = Vec<Page>, IntoIter = I>,
- ) -> Self {
+ pub fn new(pages: impl IntoIterator<Item = Vec<Page>, IntoIter = I>) ->
Self {
Self {
- schema,
- column_desc,
page_reader_iter: pages.into_iter(),
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]