This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d393d2a82f [Fix][Connector-V2][Hbase] Fix source reader only scanning 
first split (#10287)
d393d2a82f is described below

commit d393d2a82f2ac39f7a6817c8aa43a363037c4012
Author: yzeng1618 <[email protected]>
AuthorDate: Wed Jan 7 22:04:36 2026 +0800

    [Fix][Connector-V2][Hbase] Fix source reader only scanning first split 
(#10287)
---
 .../seatunnel/hbase/source/HbaseSourceReader.java  |  40 ++++---
 .../hbase/source/HbaseSourceReaderTest.java        | 122 +++++++++++++++++++++
 2 files changed, 145 insertions(+), 17 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
index aa64812632..4bf1fec791 100644
--- 
a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hbase.source;
 
+import 
org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
 import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
 import org.apache.seatunnel.shade.com.google.common.collect.Maps;
 
@@ -62,10 +63,22 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
 
     private HBaseDeserializationFormat hbaseDeserializationFormat =
             new HBaseDeserializationFormat();
-    private ResultScanner currentScanner;
 
     public HbaseSourceReader(
             HbaseParameters hbaseParameters, Context context, SeaTunnelRowType 
seaTunnelRowType) {
+        this(
+                hbaseParameters,
+                context,
+                seaTunnelRowType,
+                HbaseClient.createInstance(hbaseParameters));
+    }
+
+    @VisibleForTesting
+    HbaseSourceReader(
+            HbaseParameters hbaseParameters,
+            Context context,
+            SeaTunnelRowType seaTunnelRowType,
+            HbaseClient hbaseClient) {
         this.hbaseParameters = hbaseParameters;
         this.context = context;
         this.seaTunnelRowType = seaTunnelRowType;
@@ -82,7 +95,7 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
                                 Preconditions.checkArgument(
                                         column.contains(":") && 
column.split(":").length == 2,
                                         "Invalid column names, it should be 
[ColumnFamily:Column] format"));
-        hbaseClient = HbaseClient.createInstance(hbaseParameters);
+        this.hbaseClient = hbaseClient;
     }
 
     @Override
@@ -92,13 +105,6 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
 
     @Override
     public void close() throws IOException {
-        if (this.currentScanner != null) {
-            try {
-                this.currentScanner.close();
-            } catch (Exception e) {
-                throw new IOException("Failed to close HBase Scanner.", e);
-            }
-        }
         if (this.hbaseClient != null) {
             try {
                 this.hbaseClient.close();
@@ -115,14 +121,14 @@ public class HbaseSourceReader implements 
SourceReader<SeaTunnelRow, HbaseSource
             final HbaseSourceSplit split = sourceSplits.poll();
             if (Objects.nonNull(split)) {
                 // read logic
-                if (currentScanner == null) {
-                    currentScanner = hbaseClient.scan(split, hbaseParameters, 
this.columnNames);
-                }
-                for (Result result : currentScanner) {
-                    SeaTunnelRow seaTunnelRow =
-                            hbaseDeserializationFormat.deserialize(
-                                    convertRawRow(result), seaTunnelRowType);
-                    output.collect(seaTunnelRow);
+                try (ResultScanner scanner =
+                        hbaseClient.scan(split, hbaseParameters, 
this.columnNames)) {
+                    for (Result result : scanner) {
+                        SeaTunnelRow seaTunnelRow =
+                                hbaseDeserializationFormat.deserialize(
+                                        convertRawRow(result), 
seaTunnelRowType);
+                        output.collect(seaTunnelRow);
+                    }
                 }
             } else if (noMoreSplit && sourceSplits.isEmpty()) {
                 // signal to the source that we have reached the end of the 
data.
diff --git 
a/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReaderTest.java
 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReaderTest.java
new file mode 100644
index 0000000000..b98b330193
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hbase/src/test/java/org/apache/seatunnel/connectors/seatunnel/hbase/source/HbaseSourceReaderTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hbase.source;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.hbase.client.HbaseClient;
+import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class HbaseSourceReaderTest {
+
+    private static class CountingCollector implements Collector<SeaTunnelRow> {
+        private final Object checkpointLock = new Object();
+        private int count;
+
+        @Override
+        public void collect(SeaTunnelRow record) {
+            count++;
+        }
+
+        @Override
+        public Object getCheckpointLock() {
+            return checkpointLock;
+        }
+
+        public int getCount() {
+            return count;
+        }
+    }
+
+    @Test
+    void testPollNextReadsAllSplits() throws Exception {
+        HbaseParameters hbaseParameters = mock(HbaseParameters.class);
+        when(hbaseParameters.getTable()).thenReturn("test_table");
+
+        SourceReader.Context readerContext = mock(SourceReader.Context.class);
+        HbaseClient hbaseClient = mock(HbaseClient.class);
+
+        SeaTunnelRowType seaTunnelRowType =
+                new SeaTunnelRowType(
+                        new String[] {"rowkey", "cf1:id", "cf1:name"},
+                        new SeaTunnelDataType[] {
+                            BasicType.STRING_TYPE, BasicType.STRING_TYPE, 
BasicType.STRING_TYPE
+                        });
+
+        HbaseSourceReader reader =
+                new HbaseSourceReader(
+                        hbaseParameters, readerContext, seaTunnelRowType, 
hbaseClient);
+
+        HbaseSourceSplit split0 = new HbaseSourceSplit(0, Bytes.toBytes("a"), 
Bytes.toBytes("b"));
+        HbaseSourceSplit split1 = new HbaseSourceSplit(1, Bytes.toBytes("b"), 
Bytes.toBytes("c"));
+
+        Result result0 = mock(Result.class);
+        when(result0.getRow()).thenReturn(Bytes.toBytes("row0"));
+        when(result0.getValue(any(byte[].class), any(byte[].class)))
+                .thenReturn(Bytes.toBytes("v0"));
+
+        Result result1 = mock(Result.class);
+        when(result1.getRow()).thenReturn(Bytes.toBytes("row1"));
+        when(result1.getValue(any(byte[].class), any(byte[].class)))
+                .thenReturn(Bytes.toBytes("v1"));
+
+        ResultScanner scanner0 = mock(ResultScanner.class);
+        
when(scanner0.iterator()).thenReturn(Arrays.asList(result0).iterator());
+        ResultScanner scanner1 = mock(ResultScanner.class);
+        
when(scanner1.iterator()).thenReturn(Arrays.asList(result1).iterator());
+
+        when(hbaseClient.scan(eq(split0), eq(hbaseParameters), 
anyList())).thenReturn(scanner0);
+        when(hbaseClient.scan(eq(split1), eq(hbaseParameters), 
anyList())).thenReturn(scanner1);
+
+        reader.addSplits(Arrays.asList(split0, split1));
+        reader.handleNoMoreSplits();
+
+        CountingCollector collector = new CountingCollector();
+        reader.pollNext(collector);
+        reader.pollNext(collector);
+        reader.pollNext(collector);
+
+        assertEquals(2, collector.getCount());
+        verify(hbaseClient, times(1)).scan(eq(split0), eq(hbaseParameters), 
anyList());
+        verify(hbaseClient, times(1)).scan(eq(split1), eq(hbaseParameters), 
anyList());
+        verify(scanner0, times(1)).close();
+        verify(scanner1, times(1)).close();
+        verify(readerContext, times(1)).signalNoMoreElement();
+    }
+}

Reply via email to