This is an automated email from the ASF dual-hosted git repository.

liyang pushed a commit to branch kylin5
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit d3b4469484a79102071db26114f6602b30d70f43
Author: Liang.Hua <36814772+jacob...@users.noreply.github.com>
AuthorDate: Fri Jul 21 16:31:39 2023 +0800

    KYLIN-5767 Calculate total rows wrongly when connecting jdbc datasource
    
    ---------
    Co-authored-by: liang.hua <liang....@kyligence.io>
---
 .../kylin/engine/spark/NSparkCubingEngine.java     |  4 ++++
 .../engine/spark/builder/SnapshotBuilder.scala     |  7 +++++--
 .../engine/spark/job/NSparkCubingJobTest.java      |  8 ++++++++
 .../kylin/source/jdbc/DefaultSourceConnector.java  |  5 +++++
 .../apache/kylin/source/jdbc/ISourceConnector.java |  3 +++
 .../apache/kylin/source/jdbc/JdbcSourceInput.java  | 11 ++++++++++
 .../kylin/source/jdbc/JdbcSourceInputTest.java     | 24 ++++++++++++++++++++++
 7 files changed, 60 insertions(+), 2 deletions(-)

diff --git 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/NSparkCubingEngine.java
 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/NSparkCubingEngine.java
index 00078a86ad..76a2bcb9ff 100644
--- 
a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/NSparkCubingEngine.java
+++ 
b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/NSparkCubingEngine.java
@@ -50,6 +50,10 @@ public class NSparkCubingEngine implements NCubingEngine {
          * @return the Dataset<Row>, its schema consists of table column's 
name, for example, [column1,column2,column3]
          */
         Dataset<Row> getSourceData(TableDesc table, SparkSession ss, 
Map<String, String> parameters);
+
+        default Long getSourceDataCount(TableDesc table, SparkSession ss, 
Map<String, String> parameters) {
+            return getSourceData(table, ss, parameters).count();
+        }
     }
 
     public interface NSparkCubingStorage {
diff --git 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
index e217a31c99..1a9ef63544 100644
--- 
a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
+++ 
b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala
@@ -199,8 +199,11 @@ class SnapshotBuilder(var jobId: String) extends Logging 
with Serializable {
       case e: Throwable => logWarning(s"Calculate table 
${tableDesc.getIdentity}'s total rows exception", e)
     }
     logInfo(s"Calculate table ${tableDesc.getIdentity}'s total rows from 
source data")
-    val sourceData = getSourceData(ss, tableDesc)
-    val totalRows = sourceData.count()
+    val params = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv)
+      .getProject(tableDesc.getProject).getLegalOverrideKylinProps
+    val totalRows = SourceFactory
+      .createEngineAdapter(tableDesc, 
classOf[NSparkCubingEngine.NSparkCubingSource])
+      .getSourceDataCount(tableDesc, ss, params)
     logInfo(s"Table ${tableDesc.getIdentity}'s total rows is ${totalRows}'")
     totalRows
   }
diff --git 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java
 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java
index b86eb333f3..2078e44056 100644
--- 
a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java
+++ 
b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java
@@ -163,6 +163,14 @@ public class NSparkCubingJobTest extends 
NLocalWithSparkSessionTest {
         getLookTables(df).forEach(table -> 
Assert.assertNotNull(table.getLastSnapshotPath()));
     }
 
+    @Test
+    public void testCalculateTableTotalRows() {
+        NTableMetadataManager tableMetadataManager = 
NTableMetadataManager.getInstance(config, getProject());
+        TableDesc tableDesc = 
tableMetadataManager.getTableDesc("DEFAULT.TEST_ORDER");
+        long totalRows = new SnapshotBuilder().calculateTableTotalRows(null, 
tableDesc, ss);
+        Assert.assertEquals(5000, totalRows);
+    }
+
     private Set<TableDesc> getLookTables(NDataflow df) {
         return 
df.getModel().getLookupTables().stream().map(TableRef::getTableDesc).collect(Collectors.toSet());
     }
diff --git 
a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/DefaultSourceConnector.java
 
b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/DefaultSourceConnector.java
index 1ea35704ed..1723810955 100644
--- 
a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/DefaultSourceConnector.java
+++ 
b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/DefaultSourceConnector.java
@@ -49,4 +49,9 @@ public class DefaultSourceConnector extends DefaultAdaptor 
implements ISourceCon
                 .option("driver", driver).option("query", 
sql).options(params).load();
     }
 
+    @Override
+    public Dataset<Row> getCountData(KylinConfig kylinConfig, SparkSession 
sparkSession, String sql, Map<String, String> params) {
+        return getSourceData(kylinConfig, sparkSession, sql, params);
+    }
+
 }
diff --git 
a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/ISourceConnector.java
 
b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/ISourceConnector.java
index e2d91bc0e5..b4ab3b8b5a 100644
--- 
a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/ISourceConnector.java
+++ 
b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/ISourceConnector.java
@@ -27,4 +27,7 @@ import org.apache.spark.sql.SparkSession;
 public interface ISourceConnector {
     Dataset<Row> getSourceData(KylinConfig kylinConfig, SparkSession 
sparkSession, String sql,
             Map<String, String> params);
+
+    Dataset<Row> getCountData(KylinConfig kylinConfig, SparkSession 
sparkSession, String sql,
+                               Map<String, String> params);
 }
diff --git 
a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSourceInput.java
 
b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSourceInput.java
index b6d2cf8557..608e85fa24 100644
--- 
a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSourceInput.java
+++ 
b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSourceInput.java
@@ -62,4 +62,15 @@ public class JdbcSourceInput implements 
NSparkCubingEngine.NSparkCubingSource {
         StructType sparkSchema = dataset.schema();
         return 
dataset.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema, kylinSchema));
     }
+
+    @Override
+    public Long getSourceDataCount(TableDesc table, SparkSession ss, 
Map<String, String> parameters) {
+        String sql = String.format(Locale.ROOT, "select count(*) from %s", 
table.getIdentity());
+        KylinConfig config = table.getConfig();
+        ISourceConnector connector = (ISourceConnector) 
ClassUtil.newInstance(config.getJdbcSourceConnector());
+        parameters.put("table", table.getIdentity());
+        log.info("Use source connector: {}, sql: {} ", 
connector.getClass().getCanonicalName(), sql);
+        Dataset<Row> dataset = connector.getCountData(config, ss, sql, 
parameters);
+        return dataset.first().getLong(0);
+    }
 }
diff --git 
a/src/spark-project/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcSourceInputTest.java
 
b/src/spark-project/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcSourceInputTest.java
index 24e11d57a1..2b205c531b 100644
--- 
a/src/spark-project/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcSourceInputTest.java
+++ 
b/src/spark-project/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcSourceInputTest.java
@@ -29,6 +29,7 @@ import org.apache.kylin.metadata.model.NTableMetadataManager;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -63,4 +64,27 @@ public class JdbcSourceInputTest extends JdbcTestBase {
         Dataset<Row> sourceData = cubingSource.getSourceData(tableDesc, ss, 
Maps.newHashMap());
         System.out.println(sourceData.schema());
     }
+
+    @Test
+    public void testGetSourceDataCount() {
+        NTableMetadataManager tableMgr = 
NTableMetadataManager.getInstance(getTestConfig(), "ssb");
+        TableDesc tableDesc = tableMgr.getTableDesc("SSB.CUSTOMER");
+        ISource source = SourceFactory.getSource(new ISourceAware() {
+            @Override
+            public int getSourceType() {
+                return ISourceAware.ID_JDBC;
+            }
+
+            @Override
+            public KylinConfig getConfig() {
+                return getTestConfig();
+            }
+        });
+        NSparkCubingEngine.NSparkCubingSource cubingSource = source
+                
.adaptToBuildEngine(NSparkCubingEngine.NSparkCubingSource.class);
+        Assert.assertTrue(cubingSource instanceof JdbcSourceInput);
+        Long countData = cubingSource.getSourceDataCount(tableDesc, ss, 
Maps.newHashMap());
+        Long expectedCount = cubingSource.getSourceData(tableDesc, ss, 
Maps.newHashMap()).count();
+        Assert.assertEquals(expectedCount, countData);
+    }
 }

Reply via email to