This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new bf95de9 feat: implement get_long for GenericRow (#49)
bf95de9 is described below
commit bf95de9b607c5901543a9ced1dc0fd45235ba745
Author: Yang Guo <[email protected]>
AuthorDate: Sun Nov 30 00:15:25 2025 +0800
feat: implement get_long for GenericRow (#49)
---
crates/examples/src/example_table.rs | 10 +++++++---
crates/fluss/src/row/datum.rs | 19 +++++++++++++++++++
crates/fluss/src/row/mod.rs | 2 +-
3 files changed, 27 insertions(+), 4 deletions(-)
diff --git a/crates/examples/src/example_table.rs
b/crates/examples/src/example_table.rs
index 3eb8dd8..deab363 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -27,7 +27,7 @@ use tokio::try_join;
#[tokio::main]
pub async fn main() -> Result<()> {
let mut config = Config::parse();
- config.bootstrap_server = Some("127.0.0.1:56405".to_string());
+ config.bootstrap_server = Some("127.0.0.1:9123".to_string());
let conn = FlussConnection::new(config).await?;
@@ -36,11 +36,12 @@ pub async fn main() -> Result<()> {
Schema::builder()
.column("c1", DataTypes::int())
.column("c2", DataTypes::string())
+ .column("c3", DataTypes::bigint())
.build()?,
)
.build()?;
- let table_path = TablePath::new("fluss".to_owned(),
"rust_test".to_owned());
+ let table_path = TablePath::new("fluss".to_owned(),
"rust_test_long".to_owned());
let admin = conn.get_admin().await?;
@@ -56,6 +57,7 @@ pub async fn main() -> Result<()> {
let mut row = GenericRow::new();
row.set_field(0, 22222);
row.set_field(1, "t2t");
+ row.set_field(2, 123_456_789_123i64);
let table = conn.get_table(&table_path).await?;
let append_writer = table.new_append()?.create_writer();
@@ -63,6 +65,7 @@ pub async fn main() -> Result<()> {
row = GenericRow::new();
row.set_field(0, 233333);
row.set_field(1, "tt44");
+ row.set_field(2, 987_654_321_987i64);
let f2 = append_writer.append(row);
try_join!(f1, f2, append_writer.flush())?;
@@ -76,9 +79,10 @@ pub async fn main() -> Result<()> {
for record in scan_records {
let row = record.row();
println!(
- "{{{}, {}}}@{}",
+ "{{{}, {}, {}}}@{}",
row.get_int(0),
row.get_string(1),
+ row.get_long(2),
record.offset()
);
}
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index d8c4f74..ed33b8b 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -88,6 +88,13 @@ impl<'a> From<i32> for Datum<'a> {
}
}
+impl<'a> From<i64> for Datum<'a> {
+ #[inline]
+ fn from(i: i64) -> Datum<'a> {
+ Datum::Int64(i)
+ }
+}
+
impl<'a> From<&'a str> for Datum<'a> {
#[inline]
fn from(s: &'a str) -> Datum<'a> {
@@ -127,6 +134,18 @@ impl TryFrom<&Datum<'_>> for i32 {
}
}
+impl TryFrom<&Datum<'_>> for i64 {
+ type Error = ();
+
+ #[inline]
+ fn try_from(from: &Datum) -> std::result::Result<Self, Self::Error> {
+ match from {
+ Datum::Int64(i) => Ok(*i),
+ _ => Err(()),
+ }
+ }
+}
+
impl<'a> TryFrom<&Datum<'a>> for &'a str {
type Error = ();
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index b900cb5..aa2c411 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -102,7 +102,7 @@ impl<'a> InternalRow for GenericRow<'a> {
}
fn get_long(&self, _pos: usize) -> i64 {
- todo!()
+ self.values.get(_pos).unwrap().try_into().unwrap()
}
fn get_float(&self, _pos: usize) -> f32 {