This is an automated email from the ASF dual-hosted git repository.

yaozhq pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6526b69b [ISSUE-592] feat: Add a Streaming Source for Hiveadd stream 
hive reader (#594)
6526b69b is described below

commit 6526b69bbce536d4e918ec9fe6f6c32b9f05cb20
Author: DukeWangYu <[email protected]>
AuthorDate: Mon Aug 25 11:33:23 2025 +0800

    [ISSUE-592] feat: Add a Streaming Source for Hiveadd stream hive reader 
(#594)
    
    * add stream hive reader
    
    * add licenses to test case
    
    * refine codes after review
---
 .../geaflow/dsl/connector/api/FetchData.java       | 11 ++++
 .../geaflow/dsl/connector/hive/HiveReader.java     | 36 ++++++++++-
 .../dsl/connector/hive/HiveTableSource.java        |  6 ++
 .../dsl/connector/hive/HiveTableSourceTest.java    | 69 +++++++++++++++-------
 .../geaflow/dsl/runtime/query/HiveSourceTest.java  | 36 +++++++++++
 .../src/test/resources/query/hive_source_001.sql   | 45 ++++++++++++++
 6 files changed, 180 insertions(+), 23 deletions(-)

diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/FetchData.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/FetchData.java
index 89451288..cc042dcf 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/FetchData.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/FetchData.java
@@ -73,6 +73,17 @@ public class FetchData<T> implements Serializable {
         return nextOffset;
     }
 
+    public void seek(long seekPos) {
+        long toSkip = seekPos;
+        while (toSkip > 0) {
+            if (!dataIterator.hasNext()) {
+                throw new RuntimeException("seek pos:" + seekPos + " exceed 
the split size: " + (seekPos - toSkip));
+            }
+            dataIterator.next();
+            toSkip --;
+        }
+    }
+
     /**
      * Returns true if the fetch has finished for the partition.
      */
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveReader.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveReader.java
index 175929e1..62f648b6 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveReader.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveReader.java
@@ -20,12 +20,15 @@
 package org.apache.geaflow.dsl.connector.hive;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
+import org.apache.geaflow.common.exception.GeaflowRuntimeException;
 import org.apache.geaflow.common.utils.ClassUtil;
 import org.apache.geaflow.dsl.common.data.Row;
 import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
@@ -50,7 +53,7 @@ public class HiveReader {
     private final RecordReader<Writable, Writable> recordReader;
     private final StructType readSchema;
     private final Deserializer deserializer;
-
+    private long fetchOffset;
 
     public HiveReader(RecordReader<Writable, Writable> recordReader, 
StructType readSchema,
                       StorageDescriptor sd, Properties tableProps) {
@@ -59,6 +62,7 @@ public class HiveReader {
             f -> new TableField(f.getName().toLowerCase(Locale.ROOT), 
f.getType(), f.isNullable()))
             .collect(Collectors.toList()));
         this.deserializer = 
ClassUtil.newInstance(sd.getSerdeInfo().getSerializationLib());
+        this.fetchOffset = 0L;
         try {
             org.apache.hadoop.conf.Configuration conf = new 
org.apache.hadoop.conf.Configuration();
             SerDeUtils.initializeSerDe(deserializer, conf, tableProps, null);
@@ -69,11 +73,37 @@ public class HiveReader {
     }
 
     public FetchData<Row> read(long windowSize, String[] partitionValues) {
+        Iterator<Row> hiveIterator = new HiveIterator(recordReader, 
deserializer, partitionValues, readSchema);
         if (windowSize == Long.MAX_VALUE) {
-            Iterator<Row> hiveIterator = new HiveIterator(recordReader, 
deserializer, partitionValues, readSchema);
             return FetchData.createBatchFetch(hiveIterator, new 
HiveOffset(-1L));
         } else {
-            throw new GeaFlowDSLException("Cannot support stream read for 
hive");
+            long fetchCnt = 0L;
+            List<Row> rows = new ArrayList<>();
+            while (fetchCnt < windowSize) {
+                if (hiveIterator.hasNext()) {
+                    fetchCnt ++;
+                    rows.add(hiveIterator.next());
+                } else {
+                    break;
+                }
+            }
+            fetchOffset += fetchCnt;
+            return FetchData.createStreamFetch(rows, new 
HiveOffset(fetchOffset), fetchCnt < windowSize);
+        }
+    }
+
+    public void seek(long seekPos) {
+        try {
+            Writable key = recordReader.createKey();
+            Writable value = recordReader.createValue();
+            fetchOffset = seekPos;
+            while (seekPos-- > 0) {
+                if (!recordReader.next(key, value)) {
+                    throw new GeaflowRuntimeException("fetch offset is out of 
range: " + fetchOffset);
+                }
+            }
+        } catch (Exception e) {
+            throw new GeaflowRuntimeException(e);
         }
     }
 
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveTableSource.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveTableSource.java
index bdf8232f..72f4ad87 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveTableSource.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveTableSource.java
@@ -213,6 +213,12 @@ public class HiveTableSource implements TableSource, 
EnablePartitionPushDown {
             }
             reader = new HiveReader(recordReader, dataSchema, 
hivePartition.getSd(), tableProps);
             partitionReaders.put(partition.getName(), reader);
+            if (startOffset.isPresent()) {
+                long seekOffset = startOffset.get().getOffset();
+                if (seekOffset > 0) {
+                    reader.seek(seekOffset);
+                }
+            }
         }
         try {
             return (FetchData<T>) reader.read(desireWindowSize, 
hivePartition.getPartitionValues());
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/test/java/org/apache/geaflow/dsl/connector/hive/HiveTableSourceTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/test/java/org/apache/geaflow/dsl/connector/hive/HiveTableSourceTest.java
index bd1140bf..b6a10e30 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/test/java/org/apache/geaflow/dsl/connector/hive/HiveTableSourceTest.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/test/java/org/apache/geaflow/dsl/connector/hive/HiveTableSourceTest.java
@@ -36,7 +36,9 @@ import org.apache.geaflow.dsl.common.types.TableSchema;
 import org.apache.geaflow.dsl.connector.api.FetchData;
 import org.apache.geaflow.dsl.connector.api.Partition;
 import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.geaflow.dsl.connector.api.window.AbstractFetchWindow;
 import org.apache.geaflow.dsl.connector.api.window.AllFetchWindow;
+import org.apache.geaflow.dsl.connector.api.window.SizeFetchWindow;
 import org.apache.geaflow.runtime.core.context.DefaultRuntimeContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,12 +68,13 @@ public class HiveTableSourceTest extends BaseHiveTest {
             new TableField("name", BinaryStringType.INSTANCE, true),
             new TableField("age", IntegerType.INSTANCE, false)
         );
-        checkReadHive(ddl, inserts, dataSchema, new StructType(),
-            "[1, jim, 20]\n"
-                + "[2, kate, 18]\n"
-                + "[3, lily, 22]\n"
-                + "[4, lucy, 25]\n"
-                + "[5, jack, 26]");
+        String expected = "[1, jim, 20]\n"
+            + "[2, kate, 18]\n"
+            + "[3, lily, 22]\n"
+            + "[4, lucy, 25]\n"
+            + "[5, jack, 26]";
+        checkReadHive(ddl, inserts, dataSchema, new StructType(), false, 
expected);
+        checkReadHive(ddl, inserts, dataSchema, new StructType(), true, 
expected);
     }
 
     @Test
@@ -87,12 +90,13 @@ public class HiveTableSourceTest extends BaseHiveTest {
             new TableField("name", BinaryStringType.INSTANCE, true),
             new TableField("age", IntegerType.INSTANCE, false)
         );
-        checkReadHive(ddl, inserts, dataSchema, new StructType(),
-            "[1, jim, 20]\n"
-                + "[2, kate, 18]\n"
-                + "[3, lily, 22]\n"
-                + "[4, lucy, 25]\n"
-                + "[5, jack, 26]");
+        String expected = "[1, jim, 20]\n"
+            + "[2, kate, 18]\n"
+            + "[3, lily, 22]\n"
+            + "[4, lucy, 25]\n"
+            + "[5, jack, 26]";
+        checkReadHive(ddl, inserts, dataSchema, new StructType(), true, 
expected);
+        checkReadHive(ddl, inserts, dataSchema, new StructType(), false, 
expected);
     }
 
     @Test
@@ -108,12 +112,13 @@ public class HiveTableSourceTest extends BaseHiveTest {
             new TableField("name", BinaryStringType.INSTANCE, true),
             new TableField("age", IntegerType.INSTANCE, false)
         );
-        checkReadHive(ddl, inserts, dataSchema, new StructType(),
-            "[1, jim, 20]\n"
-                + "[2, kate, 18]\n"
-                + "[3, lily, 22]\n"
-                + "[4, lucy, 25]\n"
-                + "[5, jack, 26]");
+        String expected = "[1, jim, 20]\n"
+            + "[2, kate, 18]\n"
+            + "[3, lily, 22]\n"
+            + "[4, lucy, 25]\n"
+            + "[5, jack, 26]";
+        checkReadHive(ddl, inserts, dataSchema, new StructType(), false, 
expected);
+        checkReadHive(ddl, inserts, dataSchema, new StructType(), true, 
expected);
     }
 
     @Test
@@ -135,12 +140,23 @@ public class HiveTableSourceTest extends BaseHiveTest {
         StructType partitionSchema = new StructType(
             new TableField("dt", BinaryStringType.INSTANCE, false)
         );
-        checkReadHive(ddl, inserts, dataSchema, partitionSchema,
+        String expected = "[1, jim, 20, 2023-04-23]\n"
+            + "[2, kate, 18, 2023-04-24]\n"
+            + "[3, lily, 22, 2023-04-24]\n"
+            + "[4, lucy, 25, 2023-04-25]\n"
+            + "[5, jack, 26, 2023-04-26]";
+        checkReadHive(ddl, inserts, dataSchema, partitionSchema, false,
             "[1, jim, 20, 2023-04-23]\n"
                 + "[2, kate, 18, 2023-04-24]\n"
                 + "[3, lily, 22, 2023-04-24]\n"
                 + "[4, lucy, 25, 2023-04-25]\n"
                 + "[5, jack, 26, 2023-04-26]");
+        checkReadHive(ddl, inserts, dataSchema, partitionSchema, true,
+                "[1, jim, 20, 2023-04-23]\n"
+                        + "[2, kate, 18, 2023-04-24]\n"
+                        + "[3, lily, 22, 2023-04-24]\n"
+                        + "[4, lucy, 25, 2023-04-25]\n"
+                        + "[5, jack, 26, 2023-04-26]");
     }
 
     @Test
@@ -163,16 +179,23 @@ public class HiveTableSourceTest extends BaseHiveTest {
             new TableField("hh", BinaryStringType.INSTANCE, false),
             new TableField("dt", BinaryStringType.INSTANCE, false)
         );
-        checkReadHive(ddl, inserts, dataSchema, partitionSchema,
+        checkReadHive(ddl, inserts, dataSchema, partitionSchema, false,
             "[1, jim, 20, 10, 2023-04-23]\n"
                 + "[2, kate, 18, 10, 2023-04-24]\n"
                 + "[3, lily, 22, 11, 2023-04-24]\n"
                 + "[4, lucy, 25, 12, 2023-04-25]\n"
                 + "[5, jack, 26, 13, 2023-04-26]");
+        checkReadHive(ddl, inserts, dataSchema, partitionSchema, true,
+                "[1, jim, 20, 10, 2023-04-23]\n"
+                        + "[2, kate, 18, 10, 2023-04-24]\n"
+                        + "[3, lily, 22, 11, 2023-04-24]\n"
+                        + "[4, lucy, 25, 12, 2023-04-25]\n"
+                        + "[5, jack, 26, 13, 2023-04-26]");
     }
 
     private void checkReadHive(String ddl, String inserts, StructType 
dataSchema,
                                StructType partitionSchema,
+                               boolean isStream,
                                String expectResult) throws IOException {
         executeHiveSql("Drop table if exists hive_user");
         executeHiveSql(ddl);
@@ -201,6 +224,12 @@ public class HiveTableSourceTest extends BaseHiveTest {
         List<Row> readRows = new ArrayList<>();
         for (Partition partition : partitions) {
             LOGGER.info("partition: {}", partition.getName());
+            AbstractFetchWindow window;
+            if (isStream) {
+                window = new SizeFetchWindow(1, Long.MAX_VALUE);
+            } else {
+                window = new AllFetchWindow(1);
+            }
             FetchData<Row> fetchData = hiveTableSource.fetch(partition, 
Optional.empty(),
                 new AllFetchWindow(1));
             Iterator<Row> rowIterator = fetchData.getDataIterator();
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/HiveSourceTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/HiveSourceTest.java
new file mode 100644
index 00000000..af57d249
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/HiveSourceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.query;
+
+import org.apache.geaflow.common.config.keys.FrameworkConfigKeys;
+import org.testng.annotations.Test;
+
+public class HiveSourceTest {
+
+    @Test(enabled = false)
+    public void testHiveSource_001() throws Exception {
+        QueryTester
+                .build()
+                
.withConfig(FrameworkConfigKeys.BATCH_NUMBER_PER_CHECKPOINT.getKey(), 1)
+                .withQueryPath("/query/hive_source_001.sql")
+                .execute()
+                .checkSinkResult();
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/hive_source_001.sql
 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/hive_source_001.sql
new file mode 100644
index 00000000..6f351179
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/hive_source_001.sql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS v_person (
+    src bigint,
+    dst bigint
+) WITH (
+    type='hive',
+    geaflow.dsl.hive.database.name='db1',
+    geaflow.dsl.hive.table.name='test',
+    geaflow.dsl.window.size='1000000',
+    geaflow.dsl.hive.metastore.uris='thrift://localhost:9083'
+);
+
+
+CREATE TABLE IF NOT EXISTS tbl_result (
+    src bigint,
+    dst bigint
+) WITH (
+    type='file',
+    geaflow.dsl.file.path='${target}'
+);
+
+INSERT INTO tbl_result
+SELECT
+    src,
+    dst
+FROM v_person
+;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to