This is an automated email from the ASF dual-hosted git repository. liyang pushed a commit to branch kylin5 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit d3b4469484a79102071db26114f6602b30d70f43 Author: Liang.Hua <36814772+jacob...@users.noreply.github.com> AuthorDate: Fri Jul 21 16:31:39 2023 +0800 KYLIN-5767 Calculate total rows wrongly when connecting jdbc datasource --------- Co-authored-by: liang.hua <liang....@kyligence.io> --- .../kylin/engine/spark/NSparkCubingEngine.java | 4 ++++ .../engine/spark/builder/SnapshotBuilder.scala | 7 +++++-- .../engine/spark/job/NSparkCubingJobTest.java | 8 ++++++++ .../kylin/source/jdbc/DefaultSourceConnector.java | 5 +++++ .../apache/kylin/source/jdbc/ISourceConnector.java | 3 +++ .../apache/kylin/source/jdbc/JdbcSourceInput.java | 11 ++++++++++ .../kylin/source/jdbc/JdbcSourceInputTest.java | 24 ++++++++++++++++++++++ 7 files changed, 60 insertions(+), 2 deletions(-) diff --git a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/NSparkCubingEngine.java b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/NSparkCubingEngine.java index 00078a86ad..76a2bcb9ff 100644 --- a/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/NSparkCubingEngine.java +++ b/src/spark-project/engine-spark/src/main/java/org/apache/kylin/engine/spark/NSparkCubingEngine.java @@ -50,6 +50,10 @@ public class NSparkCubingEngine implements NCubingEngine { * @return the Dataset<Row>, its schema consists of table column's name, for example, [column1,column2,column3] */ Dataset<Row> getSourceData(TableDesc table, SparkSession ss, Map<String, String> parameters); + + default Long getSourceDataCount(TableDesc table, SparkSession ss, Map<String, String> parameters) { + return getSourceData(table, ss, parameters).count(); + } } public interface NSparkCubingStorage { diff --git a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala index e217a31c99..1a9ef63544 100644 --- a/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala +++ b/src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/SnapshotBuilder.scala @@ -199,8 +199,11 @@ class SnapshotBuilder(var jobId: String) extends Logging with Serializable { case e: Throwable => logWarning(s"Calculate table ${tableDesc.getIdentity}'s total rows exception", e) } logInfo(s"Calculate table ${tableDesc.getIdentity}'s total rows from source data") - val sourceData = getSourceData(ss, tableDesc) - val totalRows = sourceData.count() + val params = NProjectManager.getInstance(KylinConfig.getInstanceFromEnv) + .getProject(tableDesc.getProject).getLegalOverrideKylinProps + val totalRows = SourceFactory + .createEngineAdapter(tableDesc, classOf[NSparkCubingEngine.NSparkCubingSource]) + .getSourceDataCount(tableDesc, ss, params) logInfo(s"Table ${tableDesc.getIdentity}'s total rows is ${totalRows}'") totalRows } diff --git a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java index b86eb333f3..2078e44056 100644 --- a/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java +++ b/src/spark-project/engine-spark/src/test/java/org/apache/kylin/engine/spark/job/NSparkCubingJobTest.java @@ -163,6 +163,14 @@ public class NSparkCubingJobTest extends NLocalWithSparkSessionTest { getLookTables(df).forEach(table -> Assert.assertNotNull(table.getLastSnapshotPath())); } + @Test + public void testCalculateTableTotalRows() { + NTableMetadataManager tableMetadataManager = NTableMetadataManager.getInstance(config, getProject()); + TableDesc tableDesc = tableMetadataManager.getTableDesc("DEFAULT.TEST_ORDER"); + long totalRows = new SnapshotBuilder().calculateTableTotalRows(null, tableDesc, ss); + Assert.assertEquals(5000, totalRows); + } + private Set<TableDesc> getLookTables(NDataflow df) { return df.getModel().getLookupTables().stream().map(TableRef::getTableDesc).collect(Collectors.toSet()); } diff --git a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/DefaultSourceConnector.java b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/DefaultSourceConnector.java index 1ea35704ed..1723810955 100644 --- a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/DefaultSourceConnector.java +++ b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/DefaultSourceConnector.java @@ -49,4 +49,9 @@ public class DefaultSourceConnector extends DefaultAdaptor implements ISourceCon .option("driver", driver).option("query", sql).options(params).load(); } + @Override + public Dataset<Row> getCountData(KylinConfig kylinConfig, SparkSession sparkSession, String sql, Map<String, String> params) { + return getSourceData(kylinConfig, sparkSession, sql, params); + } + } diff --git a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/ISourceConnector.java b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/ISourceConnector.java index e2d91bc0e5..b4ab3b8b5a 100644 --- a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/ISourceConnector.java +++ b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/ISourceConnector.java @@ -27,4 +27,7 @@ import org.apache.spark.sql.SparkSession; public interface ISourceConnector { Dataset<Row> getSourceData(KylinConfig kylinConfig, SparkSession sparkSession, String sql, Map<String, String> params); + + Dataset<Row> getCountData(KylinConfig kylinConfig, SparkSession sparkSession, String sql, + Map<String, String> params); } diff --git a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSourceInput.java b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSourceInput.java index b6d2cf8557..608e85fa24 100644 --- a/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSourceInput.java +++ b/src/spark-project/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSourceInput.java @@ -62,4 +62,15 @@ public class JdbcSourceInput implements NSparkCubingEngine.NSparkCubingSource { StructType sparkSchema = dataset.schema(); return dataset.select(SparderTypeUtil.alignDataTypeAndName(sparkSchema, kylinSchema)); } + + @Override + public Long getSourceDataCount(TableDesc table, SparkSession ss, Map<String, String> parameters) { + String sql = String.format(Locale.ROOT, "select count(*) from %s", table.getIdentity()); + KylinConfig config = table.getConfig(); + ISourceConnector connector = (ISourceConnector) ClassUtil.newInstance(config.getJdbcSourceConnector()); + parameters.put("table", table.getIdentity()); + log.info("Use source connector: {}, sql: {} ", connector.getClass().getCanonicalName(), sql); + Dataset<Row> dataset = connector.getCountData(config, ss, sql, parameters); + return dataset.first().getLong(0); + } } diff --git a/src/spark-project/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcSourceInputTest.java b/src/spark-project/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcSourceInputTest.java index 24e11d57a1..2b205c531b 100644 --- a/src/spark-project/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcSourceInputTest.java +++ b/src/spark-project/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/JdbcSourceInputTest.java @@ -29,6 +29,7 @@ import org.apache.kylin.metadata.model.NTableMetadataManager; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -63,4 +64,27 @@ public class JdbcSourceInputTest extends JdbcTestBase { Dataset<Row> sourceData = cubingSource.getSourceData(tableDesc, ss, Maps.newHashMap()); System.out.println(sourceData.schema()); } + + @Test + public void testGetSourceDataCount() { + NTableMetadataManager tableMgr = NTableMetadataManager.getInstance(getTestConfig(), "ssb"); + TableDesc tableDesc = tableMgr.getTableDesc("SSB.CUSTOMER"); + ISource source = SourceFactory.getSource(new ISourceAware() { + @Override + public int getSourceType() { + return ISourceAware.ID_JDBC; + } + + @Override + public KylinConfig getConfig() { + return getTestConfig(); + } + }); + NSparkCubingEngine.NSparkCubingSource cubingSource = source + .adaptToBuildEngine(NSparkCubingEngine.NSparkCubingSource.class); + Assert.assertTrue(cubingSource instanceof JdbcSourceInput); + Long countData = cubingSource.getSourceDataCount(tableDesc, ss, Maps.newHashMap()); + Long expectedCount = cubingSource.getSourceData(tableDesc, ss, Maps.newHashMap()).count(); + Assert.assertEquals(expectedCount, countData); + } }