This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch kylin-on-parquet-v2 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 656d9f3c923b18610aa93a9975a5eb6a9f2d8ec8 Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Sun Aug 23 22:54:32 2020 +0800 KYLIN-4660 Cleanup for IT and logger --- .../java/org/apache/kylin/common/KylinConfig.java | 25 +++-- .../org/apache/kylin/common/KylinConfigBase.java | 2 +- .../kylin/common/persistence/ResourceTool.java | 2 +- .../kylin/common/util/AbstractKylinTestCase.java | 5 - .../kylin/common/util/TempMetadataBuilder.java | 18 +++- .../java/org/apache/kylin/cube/model/CubeDesc.java | 17 +--- .../model/validation/rule/FunctionRuleTest.java | 17 ---- .../apache/kylin/measure/MeasureTypeFactory.java | 8 +- .../parquet_test/cube_desc/ci_left_join_cube.json | 21 ++-- .../resources/query/sql_h2_uncapable/query03.sql | 5 +- .../resources/query/sql_h2_uncapable/query04.sql | 6 +- .../sql/execution/datasource/FilePruner.scala | 2 +- .../main/scala/org/apache/spark/utils/LogEx.scala | 2 +- .../kylin/engine/spark/source/CsvSource.java | 2 +- .../kylin/engine/spark/job/CubeBuildJob.java | 2 +- .../kylin/engine/spark/utils/JobMetricsUtils.scala | 2 +- .../engine/spark/LocalWithSparkSessionTest.java | 24 ++--- .../src/test/resources/log4j.properties | 11 ++- .../apache/kylin/query/runtime/SparkEngine.java | 8 +- .../kylin/query/runtime/plans/ProjectPlan.scala | 2 +- ...ldAndQueryTest.java => NBuildAndQueryTest.java} | 106 +++++++-------------- .../apache/kylin/engine/spark2/NExecAndComp.java | 38 ++++---- .../spark2/NManualBuildAndQueryCuboidTest.java | 4 +- .../kylin/engine/spark2/utils/QueryUtil.java | 24 ++--- .../apache/kylin/query/routing/RoutingRule.java | 2 +- 25 files changed, 165 insertions(+), 190 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java index efeceba..8562314 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java @@ -575,22 +575,33 @@ public class KylinConfig extends KylinConfigBase { return super.hashCode(); } - // Only used in test cases!!! + /** + * This is only used in test. + * + * 1. Load metadata from localMetaDir + * 2. Prepare a temp working-dir + */ public static void setKylinConfigForLocalTest(String localMetaDir) { synchronized (KylinConfig.class) { - if (new File(localMetaDir, "kylin.properties").exists() == false) + if (!new File(localMetaDir, "kylin.properties").exists()) throw new IllegalArgumentException(localMetaDir + " is not a valid local meta dir"); destroyInstance(); - logger.info("Setting KylinConfig to " + localMetaDir); + String canonicalPath = localMetaDir; + try { + // remove the ".." in path string + canonicalPath = new File(localMetaDir).getCanonicalPath(); + } catch (IOException e) { + throw new IllegalStateException(""); + } - System.setProperty(KylinConfig.KYLIN_CONF, localMetaDir); + logger.info("Setting KylinConfig to {} in UT.", canonicalPath); + System.setProperty(KylinConfig.KYLIN_CONF, canonicalPath); KylinConfig config = KylinConfig.getInstanceFromEnv(); - config.setMetadataUrl(localMetaDir); + config.setMetadataUrl(canonicalPath); - // make sure a local working directory - File workingDir = new File(localMetaDir, "working-dir"); + File workingDir = new File(canonicalPath, "working-dir"); workingDir.mkdirs(); String path = workingDir.getAbsolutePath(); if (!path.startsWith("/")) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index be7eeb4..f317ba2 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2819,7 +2819,7 @@ public abstract class KylinConfigBase implements Serializable { } public int getSparkSqlShufflePartitions() { - return Integer.valueOf(getOptional("kylin.query.spark-engine.spark-sql-shuffle-partitions", "-1")); + return Integer.parseInt(getOptional("kylin.query.spark-engine.spark-sql-shuffle-partitions", "-1")); } public Map<String, String> getSparkConf() { diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java index 2dc0984..dd5c8da 100644 --- a/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java +++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ResourceTool.java @@ -271,7 +271,7 @@ public class ResourceTool { try { RawResource res = src.getResource(path); if (res != null) { - logger.info("Copy path: {} from {} to {}", path, src, dst); + logger.debug("Copy path: {} from {} to {}", path, src, dst); try { dst.putResource(path, res.content(), res.lastModified()); } finally { diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java index 7cbf5b6..af6beec 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java @@ -26,10 +26,6 @@ import org.apache.kylin.common.KylinConfig; */ public abstract class AbstractKylinTestCase { - static { - System.setProperty("needCheckCC", "true"); - } - public abstract void createTestMetadata(String... overlayMetadataDirs) throws Exception; public abstract void cleanupTestMetadata() throws Exception; @@ -42,5 +38,4 @@ public abstract class AbstractKylinTestCase { System.clearProperty(KylinConfig.KYLIN_CONF); KylinConfig.destroyInstance(); } - } diff --git a/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java b/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java index 3653bf3..b804deb 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/TempMetadataBuilder.java @@ -30,22 +30,31 @@ import java.util.List; public class TempMetadataBuilder { - public static final String N_KAP_META_TEST_DATA = "../examples/test_case_data/parquet_test"; + /** + * Provided metadata for Parquet Storage + */ public static final String N_SPARK_PROJECT_KYLIN_META_TEST_DATA = "../../examples/test_case_data/parquet_test"; + public static final String N_SPARK_PROJECT_KYLIN_META_TEST_DATA_2 = "../examples/test_case_data/parquet_test"; + + /** + * Temporary metadata dir + */ public static final String TEMP_TEST_METADATA = "../examples/test_metadata"; private static final Logger logger = LoggerFactory.getLogger(TempMetadataBuilder.class); + /** + * @see TempMetadataBuilder#TEMP_TEST_METADATA + */ public static String prepareNLocalTempMetadata() { return prepareNLocalTempMetadata(false); } public static String prepareNLocalTempMetadata(boolean debug) { - // for spark-project if (new File(N_SPARK_PROJECT_KYLIN_META_TEST_DATA).exists()) { return new TempMetadataBuilder(debug, "../" + TEMP_TEST_METADATA, N_SPARK_PROJECT_KYLIN_META_TEST_DATA).build(); } - return new TempMetadataBuilder(debug, TEMP_TEST_METADATA, N_KAP_META_TEST_DATA).build(); + return new TempMetadataBuilder(debug, TEMP_TEST_METADATA, N_SPARK_PROJECT_KYLIN_META_TEST_DATA_2).build(); } public static String prepareNLocalTempMetadata(boolean debug, String overlay) { @@ -67,7 +76,8 @@ public class TempMetadataBuilder { this.dst = dst; } - public String build() { + private String build() { + logger.debug("Prepare temp metadata for ut/it ."); if ("true".equals(System.getProperty("skipMetaPrep"))) { return dst; } diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 84048e9..6eb04eb 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -662,11 +662,11 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { throw new RuntimeException("Error during adapting hbase mapping", e); } } else { - // skip at Kylin 4.0 -// if (hbaseMapping != null) { -// hbaseMapping.init(this); -// initMeasureReferenceToColumnFamily(); -// } + // to be removed in Kylin 4.0 + if (hbaseMapping != null) { + hbaseMapping.init(this); + initMeasureReferenceToColumnFamily(); + } } // check all dimension columns are presented on rowkey @@ -1120,13 +1120,6 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { checkState(!measureSet.contains(colMeasureRefs[i]), "measure (%s) duplicates", colMeasureRefs[i]); measureSet.add(colMeasureRefs[i]); - - if (storageType > IStorageAware.ID_SHARDED_HBASE) { - checkState(measureIndex[i] > lastMeasureIndex, "measure (%s) is not in order", - colMeasureRefs[i]); - lastMeasureIndex = measureIndex[i]; - } - checkEachMeasureExist.set(measureIndex[i]); } c.setMeasures(measureDescs); diff --git a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/FunctionRuleTest.java b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/FunctionRuleTest.java index 5368e16..f8bf36d 100644 --- a/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/FunctionRuleTest.java +++ b/core-cube/src/test/java/org/apache/kylin/cube/model/validation/rule/FunctionRuleTest.java @@ -23,15 +23,11 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.util.List; - -import com.google.common.collect.Lists; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.cube.model.validation.ValidateContext; -import org.apache.kylin.metadata.model.MeasureDesc; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -62,17 +58,4 @@ public class FunctionRuleTest extends LocalFileMetadataTestCase { vContext.print(System.out); assertTrue(vContext.getResults().length == 0); } - - @Test(expected = IllegalStateException.class) - public void testValidateMeasureNamesDuplicated() throws IOException { - File f = new File(LocalFileMetadataTestCase.LOCALMETA_TEST_DATA + "/cube_desc/ssb.json"); - CubeDesc desc = JsonUtil.readValue(new FileInputStream(f), CubeDesc.class); - - MeasureDesc measureDescDuplicated = desc.getMeasures().get(1); - List<MeasureDesc> newMeasures = Lists.newArrayList(desc.getMeasures()); - newMeasures.add(measureDescDuplicated); - desc.setMeasures(newMeasures); - - desc.init(config); - } } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java index d16a705..ee0ee93 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java @@ -114,13 +114,13 @@ abstract public class MeasureTypeFactory<T> { factoryInsts.add(new PercentileMeasureType.Factory()); factoryInsts.add(new DimCountDistinctMeasureType.Factory()); - logger.info("Checking custom measure types from kylin config"); + logger.trace("Checking custom measure types from kylin config"); try { Map<String, String> customMeasureTypes = KylinConfig.getInstanceFromEnv().getCubeCustomMeasureTypes(); for (String customFactory : customMeasureTypes.values()) { try { - logger.info("Checking custom measure types from kylin config: " + customFactory); + logger.debug("Checking custom measure types from kylin config: " + customFactory); factoryInsts.add((MeasureTypeFactory<?>) Class.forName(customFactory).newInstance()); } catch (Exception e) { throw new IllegalArgumentException("Unrecognized MeasureTypeFactory classname: " + customFactory, @@ -128,7 +128,7 @@ abstract public class MeasureTypeFactory<T> { } } } catch (KylinConfigCannotInitException e) { - logger.warn("Will not add custome MeasureTypeFactory as KYLIN_CONF nor KYLIN_HOME is set"); + logger.warn("Will not add custom MeasureTypeFactory as KYLIN_CONF nor KYLIN_HOME is set"); } // register factories & data type serializers @@ -143,7 +143,7 @@ abstract public class MeasureTypeFactory<T> { "Aggregation data type name '" + dataTypeName + "' must be in lower case"); Class<? extends DataTypeSerializer<?>> serializer = factory.getAggrDataTypeSerializer(); - logger.info("registering " + funcName + "(" + dataTypeName + "), " + factory.getClass()); + logger.debug("registering " + funcName + "(" + dataTypeName + "), " + factory.getClass()); DataType.register(dataTypeName); DataTypeSerializer.register(dataTypeName, serializer); registerUDAF(factory); diff --git a/examples/test_case_data/parquet_test/cube_desc/ci_left_join_cube.json b/examples/test_case_data/parquet_test/cube_desc/ci_left_join_cube.json index f4d0fd1..f175177 100644 --- a/examples/test_case_data/parquet_test/cube_desc/ci_left_join_cube.json +++ b/examples/test_case_data/parquet_test/cube_desc/ci_left_join_cube.json @@ -225,6 +225,16 @@ "returntype" : "bitmap" } }, { + "name": "GVM_PERCENTILE", + "function": { + "expression": "PERCENTILE_APPROX", + "parameter": { + "type": "column", + "value": "TEST_KYLIN_FACT.PRICE" + }, + "returntype": "percentile(100)" + } + }, { "name" : "TEST_COUNT_COLUMN_CNT", "function" : { "expression" : "COUNT", @@ -333,7 +343,7 @@ "name" : "F1", "columns" : [ { "qualifier" : "M", - "measure_refs" : [ "TRANS_CNT", "ITEM_COUNT_SUM", "GMV_SUM", "GMV_CNT", "GMV_MIN", "GMV_MAX", "TEST_COUNT_COLUMN_CNT" ] + "measure_refs" : [ "TRANS_CNT", "ITEM_COUNT_SUM", "GMV_SUM", "GMV_CNT", "GMV_MIN", "GMV_MAX", "TEST_COUNT_COLUMN_CNT", "GVM_PERCENTILE" ] } ] }, { "name" : "F2", @@ -371,16 +381,9 @@ "engine_type" : 6, "storage_type" : 4, "override_kylin_properties" : { - "kylin.cube.algorithm" : "INMEM", - "kylin.dictionary.shrunken-from-global-enabled" : "true" }, "cuboid_black_list" : [ ], "parent_forward" : 3, "mandatory_dimension_set_list" : [ ], - "snapshot_table_desc_list" : [ { - "table_name" : "DEFAULT.TEST_CATEGORY_GROUPINGS", - "storage_type" : "hbase", - "local_cache_enable" : true, - "global" : true - } ] + "snapshot_table_desc_list" : [ ] } \ No newline at end of file diff --git a/kylin-it/src/test/resources/query/sql_h2_uncapable/query03.sql b/kylin-it/src/test/resources/query/sql_h2_uncapable/query03.sql index ee89723..5455a05 100644 --- a/kylin-it/src/test/resources/query/sql_h2_uncapable/query03.sql +++ b/kylin-it/src/test/resources/query/sql_h2_uncapable/query03.sql @@ -1 +1,4 @@ -select "TEST_KYLIN_FACT"."CAL_DT" as "CAL_DT" from "DEFAULT"."TEST_KYLIN_FACT" "TEST_KYLIN_FACT" where ("TEST_KYLIN_FACT"."CAL_DT" ) >= DATE'2013-01-07' + interval '3' day group by "TEST_KYLIN_FACT"."CAL_DT" order by "TEST_KYLIN_FACT"."CAL_DT" \ No newline at end of file +select "TEST_KYLIN_FACT"."CAL_DT" as "CAL_DT" +from "DEFAULT"."TEST_KYLIN_FACT" "TEST_KYLIN_FACT" +where ("TEST_KYLIN_FACT"."CAL_DT" ) >= DATE'2013-01-07' + interval '3' day +group by "TEST_KYLIN_FACT"."CAL_DT" order by "TEST_KYLIN_FACT"."CAL_DT" \ No newline at end of file diff --git a/kylin-it/src/test/resources/query/sql_h2_uncapable/query04.sql b/kylin-it/src/test/resources/query/sql_h2_uncapable/query04.sql index 25fd2bf..5ece9c9 100644 --- a/kylin-it/src/test/resources/query/sql_h2_uncapable/query04.sql +++ b/kylin-it/src/test/resources/query/sql_h2_uncapable/query04.sql @@ -1 +1,5 @@ -select "TEST_KYLIN_FACT"."CAL_DT" as "CAL_DT" from "DEFAULT"."TEST_KYLIN_FACT" "TEST_KYLIN_FACT" where ("TEST_KYLIN_FACT"."CAL_DT" + interval '3' day ) >= DATE'2013-01-07' group by "TEST_KYLIN_FACT"."CAL_DT" order by "TEST_KYLIN_FACT"."CAL_DT" \ No newline at end of file +select "TEST_KYLIN_FACT"."CAL_DT" as "CAL_DT" +from "DEFAULT"."TEST_KYLIN_FACT" "TEST_KYLIN_FACT" +where ("TEST_KYLIN_FACT"."CAL_DT" + interval '3' day ) >= DATE'2013-01-07' +group by "TEST_KYLIN_FACT"."CAL_DT" +order by "TEST_KYLIN_FACT"."CAL_DT" \ No newline at end of file diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala index 5e4171d..2ee32a0 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/sql/execution/datasource/FilePruner.scala @@ -252,7 +252,7 @@ class FilePruner( logInfo(s"$pruningType pruning in ${(endTime - startTime).toDouble / 1000000} ms") } catch { case th: Throwable => - logError(s"Error occurs when $pruningType, scan all ${pruningType}s.", th) + logError(s"Error occurs when $specFilters, scan all ${pruningType}s.", th) } selected } diff --git a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/LogEx.scala b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/LogEx.scala index 79dd27b..a2ed0e4 100644 --- a/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/LogEx.scala +++ b/kylin-spark-project/kylin-spark-common/src/main/scala/org/apache/spark/utils/LogEx.scala @@ -43,7 +43,7 @@ trait LogEx extends Logging { // If action is quite fast, don't logging if (end - start > 2) { if (info) { - logInfo(s"Run $action take ${end - start} ms") + logDebug(s"Run $action take ${end - start} ms") } else { logTrace(s"Run $action take ${end - start} ms") } diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java index 0d67265..8621bf5 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/source/CsvSource.java @@ -101,7 +101,7 @@ public class CsvSource implements ISource { private String getUtMetaDir() { // this is only meant to be used in UT final String utMetaDir = System.getProperty(KylinConfig.KYLIN_CONF); - if (utMetaDir == null || !utMetaDir.contains("../example")) { + if (utMetaDir == null) { throw new IllegalStateException(); } return utMetaDir; diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java index 40dcb01..d8788cb 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/job/CubeBuildJob.java @@ -345,7 +345,7 @@ public class CubeBuildJob extends SparkApplication { long rowCount = metrics.getMetrics(Metrics.CUBOID_ROWS_CNT()); if (rowCount == -1) { infos.recordAbnormalLayouts(layoutId, "'Job metrics seems null, use count() to collect cuboid rows.'"); - logger.warn("Can not get cuboid row cnt, use count() to collect cuboid rows."); + logger.debug("Can not get cuboid row cnt, use count() to collect cuboid rows."); layout.setRows(dataset.count()); layout.setSourceRows(parentDSCnt); } else { diff --git a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala index a8231ac..7b6e558 100644 --- a/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala +++ b/kylin-spark-project/kylin-spark-engine/src/main/scala/org/apache/kylin/engine/spark/utils/JobMetricsUtils.scala @@ -41,7 +41,7 @@ object JobMetricsUtils extends Logging { metrics = collectOutputRows(execution.executedPlan) logInfo(s"Collect output rows successfully. $metrics") } else { - logWarning(s"Collect output rows failed.") + logInfo(s"Collect output rows failed.") } metrics } diff --git a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java index 5e187c2..2b3f480 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java +++ b/kylin-spark-project/kylin-spark-engine/src/test/java/org/apache/kylin/engine/spark/LocalWithSparkSessionTest.java @@ -73,6 +73,9 @@ import java.util.Locale; import java.util.Map; import java.util.UUID; +/** + * Base class for Parquet Storage IT + */ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase implements Serializable { private static final Logger logger = LoggerFactory.getLogger(LocalWithSparkSessionTest.class); private static final String CSV_TABLE_DIR = "../../examples/test_metadata/data/%s.csv"; @@ -84,6 +87,7 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme @Before public void setup() throws SchedulerException { + logger.info("Prepare temporary data."); overwriteSystemProp("kylin.job.scheduler.poll-interval-second", "1"); overwriteSystemProp("calcite.keep-in-clause", "true"); overwriteSystemProp("kylin.metadata.distributed-lock-impl", "org.apache.kylin.engine.spark.utils.MockedDistributedLock$MockedFactory"); @@ -98,6 +102,7 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme @After public void after() { DefaultScheduler.destroyInstance(); + logger.info("Clean up temporary data."); this.cleanupTestMetadata(); restoreAllSystemProp(); } @@ -126,22 +131,19 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme ss = SparkSession.builder().config(sparkConf).getOrCreate(); KylinSparkEnv.setSparkSession(ss); UdfManager.create(ss); - - System.out.println("Check spark sql config [spark.sql.catalogImplementation = " - + ss.conf().get("spark.sql.catalogImplementation") + "]"); ss.sparkContext().setLogLevel("WARN"); } - public void createTestMetadata() { + private void createTestMetadata() { + if(System.getProperty("noBuild", "false").equalsIgnoreCase("true")) + return; String tempMetadataDir = TempMetadataBuilder.prepareNLocalTempMetadata(); KylinConfig.setKylinConfigForLocalTest(tempMetadataDir); - getTestConfig().setProperty("kylin.query.security.acl-tcr-enabled", "false"); } - public void createTestMetadata(String metadataDir) { + protected void createTestMetadata(String metadataDir) { String tempMetadataDir = TempMetadataBuilder.prepareNLocalTempMetadata(false, metadataDir); KylinConfig.setKylinConfigForLocalTest(tempMetadataDir); - getTestConfig().setProperty("kylin.query.security.acl-tcr-enabled", "false"); } protected ExecutableState wait(AbstractExecutable job) throws InterruptedException { @@ -204,10 +206,10 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme protected void restoreAllSystemProp() { systemProp.forEach((prop, value) -> { if (value == null) { - logger.info("Clear {}", prop); + logger.trace("Clear {}", prop); System.clearProperty(prop); } else { - logger.info("restore {}", prop); + logger.trace("restore {}", prop); System.setProperty(prop, value); } }); @@ -215,7 +217,7 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme } protected static void populateSSWithCSVData(KylinConfig kylinConfig, String project, SparkSession sparkSession) { - + logger.debug("Prepare Spark data."); ProjectInstance projectInstance = ProjectManager.getInstance(kylinConfig).getProject(project); Preconditions.checkArgument(projectInstance != null); for (String table : projectInstance.getTables()) { @@ -235,7 +237,7 @@ public class LocalWithSparkSessionTest extends LocalFileMetadataTestCase impleme Dataset<Row> ret = sparkSession.read().schema(schema).csv(String.format(Locale.ROOT, CSV_TABLE_DIR, table)); ret.createOrReplaceTempView(tableDesc.getName()); } - + logger.debug(sparkSession.sql("show tables").showString(20, 50 , false)); } private static DataType convertType(org.apache.kylin.metadata.datatype.DataType type) { diff --git a/kylin-spark-project/kylin-spark-engine/src/test/resources/log4j.properties b/kylin-spark-project/kylin-spark-engine/src/test/resources/log4j.properties index 824aab8..24cf5e0 100644 --- a/kylin-spark-project/kylin-spark-engine/src/test/resources/log4j.properties +++ b/kylin-spark-project/kylin-spark-engine/src/test/resources/log4j.properties @@ -21,10 +21,15 @@ log4j.rootLogger=INFO,stderr log4j.appender.stderr=org.apache.log4j.ConsoleAppender log4j.appender.stderr.Target=System.out log4j.appender.stderr.layout=org.apache.log4j.PatternLayout -#Don't add line number (%L) as it's too costly! -log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} : %m%n +log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n -log4j.logger.org.apache.kylin=DEBUG +# This is set to WARN level to eliminate verbose output in UT/IT, please set to debug level when some case failed. +log4j.logger.org.apache.kylin=WARN +# Set debug level for IT in kylin-spark-test module +log4j.logger.org.apache.kylin.engine.spark2=DEBUG +log4j.logger.org.apache.kylin.common.persistence=INFO +log4j.logger.org.apache.kylin.cube.RawQueryLastHacker=ERROR +log4j.logger.org.apache.kylin.metadata.project.ProjectL2Cache=ERROR log4j.logger.org.springframework=WARN log4j.logger.org.apache.spark=ERROR log4j.logger.org.apache.kylin.engine.spark.builder=WARN diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java index 0b97d7e..6d444b6 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/SparkEngine.java @@ -37,7 +37,7 @@ public class SparkEngine implements QueryEngine { @Override public Enumerable<Object> computeSCALA(DataContext dataContext, RelNode relNode, RelDataType resultType) { Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode); - log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); + log.trace("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); return ResultPlan.getResult(sparkPlan, resultType, ResultType.SCALA()).right().get(); } @@ -45,18 +45,18 @@ public class SparkEngine implements QueryEngine { @Override public Enumerable<Object[]> compute(DataContext dataContext, RelNode relNode, RelDataType resultType) { Dataset<Row> sparkPlan = toSparkPlan(dataContext, relNode); - log.debug("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); + log.trace("SPARK LOGICAL PLAN {}", sparkPlan.queryExecution()); return ResultPlan.getResult(sparkPlan, resultType, ResultType.NORMAL()).left().get(); } private Dataset<Row> toSparkPlan(DataContext dataContext, RelNode relNode) { - log.info("Begin planning spark plan."); + log.trace("Begin planning spark plan."); long start = System.currentTimeMillis(); CalciteToSparkPlaner calciteToSparkPlaner = new CalciteToSparkPlaner(dataContext); long t = System.currentTimeMillis(); calciteToSparkPlaner.go(relNode); long takeTime = System.currentTimeMillis() - start; - log.info("Plan take {} ms", takeTime); + log.trace("Plan take {} ms", takeTime); return calciteToSparkPlaner.getResult(); } } diff --git a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala index f1348da..f1ce16e 100644 --- a/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala +++ b/kylin-spark-project/kylin-spark-query/src/main/scala/org/apache/kylin/query/runtime/plans/ProjectPlan.scala @@ -64,7 +64,7 @@ object ProjectPlan extends Logging { }) val prj = df.select(selectedColumns: _*) - logInfo(s"Gen project cost Time :${System.currentTimeMillis() - start} ") + logTrace(s"Gen project cost Time :${System.currentTimeMillis() - start} ") prj } } diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java similarity index 77% rename from kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryTest.java rename to kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java index a07b8d5..20e8643 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryTest.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NBuildAndQueryTest.java @@ -32,7 +32,6 @@ import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.query.routing.Candidate; import org.apache.spark.sql.KylinSparkEnv; import org.apache.kylin.engine.spark2.NExecAndComp.CompareLevel; -import org.apache.spark.sql.SparderContext; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @@ -51,9 +50,8 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @SuppressWarnings("serial") -public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { - - private static final Logger logger = LoggerFactory.getLogger(NManualBuildAndQueryTest.class); +public class NBuildAndQueryTest extends LocalWithSparkSessionTest { + private static final Logger logger = LoggerFactory.getLogger(NBuildAndQueryTest.class); private boolean succeed = true; protected KylinConfig config; @@ -81,26 +79,25 @@ public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { } @Test - @Ignore("for developing") - public void testTmp() throws Exception { + @Ignore("Manually verify for developer if `examples/test_metadata` exists.") + public void manualVerifyForDeveloper() throws Exception { final KylinConfig config = KylinConfig.getInstanceFromEnv(); - System.setProperty("noBuild", "true"); - System.setProperty("isDeveloperMode", "true"); - buildCubes(); - populateSSWithCSVData(config, getProject(), SparderContext.getSparkSession()); + populateSSWithCSVData(config, getProject(), KylinSparkEnv.getSparkSession()); List<Pair<String, Throwable>> results = execAndGetResults( Lists.newArrayList(new QueryCallable(CompareLevel.SAME, "left", "temp"))); // report(results); } @Test - public void testBasics() throws Exception { + public void verifySqlStandard() throws Exception { final KylinConfig config = KylinConfig.getInstanceFromEnv(); - + // 1. Kylin side buildCubes(); - // build is done, start to test query + // 2. Spark side populateSSWithCSVData(config, getProject(), KylinSparkEnv.getSparkSession()); + + // 3. Compare Kylin with Spark List<QueryCallable> tasks = prepareAndGenQueryTasks(config); List<Pair<String, Throwable>> results = execAndGetResults(tasks); Assert.assertEquals(results.size(), tasks.size()); @@ -110,10 +107,10 @@ public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { private List<Pair<String, Throwable>> execAndGetResults(List<QueryCallable> tasks) throws InterruptedException, java.util.concurrent.ExecutionException { ThreadPoolExecutor executor = new ThreadPoolExecutor(9// - , 9 // - , 1 // - , TimeUnit.DAYS // - , new LinkedBlockingQueue<Runnable>(100)); + , 9 + , 1 + , TimeUnit.DAYS + , new LinkedBlockingQueue<>(100)); CompletionService<Pair<String, Throwable>> service = new ExecutorCompletionService<>(executor); for (QueryCallable task : tasks) { service.submit(task); @@ -142,7 +139,7 @@ public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { } private void failFastIfNeeded(Pair<String, Throwable> result) { - if (Boolean.valueOf(System.getProperty("failFast", "false")) && result.getSecond() != null) { + if (Boolean.parseBoolean(System.getProperty("failFast", "false")) && result.getSecond() != null) { logger.error("CI failed on:" + result.getFirst()); Assert.fail(); } @@ -153,7 +150,6 @@ public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { List<QueryCallable> tasks = new ArrayList<>(); for (String joinType : joinTypes) { tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql")); -// tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "temp")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_lookup")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_casewhen")); @@ -165,16 +161,12 @@ public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_distinct_dim")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_timestamp")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_orderby")); - //tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_snowflake")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_topn")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_join")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_union")); - //tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_hive")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_distinct_precisely")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_powerbi")); - //tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_raw")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_value")); - tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_magine")); tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_cross_join")); // same row count @@ -184,31 +176,21 @@ public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { tasks.add(new QueryCallable(CompareLevel.NONE, joinType, "sql_window")); tasks.add(new QueryCallable(CompareLevel.NONE, joinType, "sql_h2_uncapable")); tasks.add(new QueryCallable(CompareLevel.NONE, joinType, "sql_grouping")); - tasks.add(new QueryCallable(CompareLevel.SAME_SQL_COMPARE, joinType, "sql_intersect_count")); - //tasks.add(new QueryCallable(CompareLevel.SAME_SQL_COMPARE, joinType, "sql_percentile")); - tasks.add(new QueryCallable(CompareLevel.NONE, joinType, "sql_distinct")); -// -// //execLimitAndValidate -// // tasks.add(new QueryCallable(CompareLevel.SUBSET, joinType, "sql")); - } + tasks.add(new QueryCallable(CompareLevel.NONE, joinType, "sql_percentile")); - // cc tests -// tasks.add(new QueryCallable(CompareLevel.SAME_SQL_COMPARE, "default", "sql_computedcolumn_common")); -// tasks.add(new QueryCallable(CompareLevel.SAME_SQL_COMPARE, "default", "sql_computedcolumn_leftjoin")); -// -// tasks.add(new QueryCallable(CompareLevel.SAME, "inner", "sql_magine_inner")); -// tasks.add(new QueryCallable(CompareLevel.SAME, "inner", "sql_magine_window")); -// tasks.add(new QueryCallable(CompareLevel.SAME, "default", "sql_rawtable")); -// tasks.add(new QueryCallable(CompareLevel.SAME, "default", "sql_multi_model")); + // HLL is not precise + tasks.add(new QueryCallable(CompareLevel.SAME_ROWCOUNT, joinType, "sql_distinct")); + } logger.info("Total {} tasks.", tasks.size()); return tasks; } public void buildCubes() throws Exception { - if (Boolean.valueOf(System.getProperty("noBuild", "false"))) { - System.out.println("Direct query"); - } else if (Boolean.valueOf(System.getProperty("isDeveloperMode", "false"))) { -// fullBuildCube("ci_inner_join_cube"); + logger.debug("Prepare Kylin data."); + if (Boolean.parseBoolean(System.getProperty("noBuild", "false"))) { + logger.debug("Query prebuilt cube."); + } else if (Boolean.parseBoolean(System.getProperty("isDeveloperMode", "false"))) { + //fullBuildCube("ci_inner_join_cube"); fullBuildCube("ci_left_join_cube"); } else { //buildAndMergeCube("ci_inner_join_cube"); @@ -218,14 +200,14 @@ public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { private void buildAndMergeCube(String cubeName) throws Exception { if (cubeName.equals("ci_inner_join_cube")) { - buildFourSegementAndMerge(cubeName); + buildFourSegmentAndMerge(cubeName); } if (cubeName.equals("ci_left_join_cube")) { - buildTwoSegementAndMerge(cubeName); + buildTwoSegmentAndMerge(cubeName); } } - private void buildTwoSegementAndMerge(String cubeName) throws Exception { + private void buildTwoSegmentAndMerge(String cubeName) throws Exception { KylinConfig config = KylinConfig.getInstanceFromEnv(); CubeManager cubeMgr = CubeManager.getInstance(config); Assert.assertTrue(config.getHdfsWorkingDirectory().startsWith("file:")); @@ -233,9 +215,7 @@ public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { // cleanup all segments first cleanupSegments(cubeName); - /** - * Round1. Build 2 segment - */ + // Round 1: Build 2 segment ExecutableState state; state = buildCuboid(cubeName, new SegmentRange.TSRange(dateToLong("2010-01-01"), dateToLong("2012-01-01"))); Assert.assertEquals(ExecutableState.SUCCEED, state); @@ -243,30 +223,24 @@ public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { state = buildCuboid(cubeName, new SegmentRange.TSRange(dateToLong("2012-01-01"), dateToLong("2015-01-01"))); Assert.assertEquals(ExecutableState.SUCCEED, state); - /** - * Round2. Merge two segments - */ + // Round 2: Merge two segments state = mergeSegments(cubeName, dateToLong("2010-01-01"), dateToLong("2015-01-01"), false); Assert.assertEquals(ExecutableState.SUCCEED, state); - /** - * validate cube segment info - */ + // validate cube segment info CubeSegment firstSegment = cubeMgr.reloadCube(cubeName).getSegments().get(0); Assert.assertEquals(new SegmentRange.TSRange(dateToLong("2010-01-01"), dateToLong("2015-01-01")), firstSegment.getSegRange()); } - private void buildFourSegementAndMerge(String cubeName) throws Exception { + private void buildFourSegmentAndMerge(String cubeName) throws Exception { Assert.assertTrue(config.getHdfsWorkingDirectory().startsWith("file:")); // cleanup all segments first cleanupSegments(cubeName); - /** - * Round1. Build 4 segment - */ + // Round 1: Build 4 segment ExecutableState state; state = buildCuboid(cubeName, new SegmentRange.TSRange(dateToLong("2010-01-01"), dateToLong("2012-06-01"))); Assert.assertEquals(ExecutableState.SUCCEED, state); @@ -280,18 +254,14 @@ public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { state = buildCuboid(cubeName, new SegmentRange.TSRange(dateToLong("2013-06-01"), dateToLong("2015-01-01"))); Assert.assertEquals(ExecutableState.SUCCEED, state); - /** - * Round2. Merge two segments - */ + // Round 2: Merge two segments state = mergeSegments(cubeName, dateToLong("2010-01-01"), dateToLong("2013-01-01"), false); Assert.assertEquals(ExecutableState.SUCCEED, state); state = mergeSegments(cubeName, dateToLong("2013-01-01"), dateToLong("2015-01-01"), false); Assert.assertEquals(ExecutableState.SUCCEED, state); - /** - * validate cube segment info - */ + // validate cube segment info CubeSegment firstSegment = cubeMgr.reloadCube(cubeName).getSegments().get(0); CubeSegment secondSegment = cubeMgr.reloadCube(cubeName).getSegments().get(1); @@ -321,20 +291,16 @@ public class NManualBuildAndQueryTest extends LocalWithSparkSessionTest { List<Pair<String, String>> queries = NExecAndComp .fetchQueries(KYLIN_SQL_BASE_DIR + File.separator + "sql"); NExecAndComp.execLimitAndValidate(queries, getProject(), joinType); - } else if (NExecAndComp.CompareLevel.SAME_SQL_COMPARE.equals(compareLevel)) { - List<Pair<String, String>> queries = NExecAndComp - .fetchQueries(KYLIN_SQL_BASE_DIR + File.separator + sqlFolder); - NExecAndComp.execAndCompare(queries, getProject(), NExecAndComp.CompareLevel.SAME_SQL_COMPARE, joinType); } else { List<Pair<String, String>> queries = NExecAndComp .fetchQueries(KYLIN_SQL_BASE_DIR + File.separator + sqlFolder); NExecAndComp.execAndCompare(queries, getProject(), compareLevel, joinType); } } catch (Throwable th) { - logger.error("Query fail on:", identity); + logger.error("Query fail on: {}", identity); return Pair.newPair(identity, th); } - logger.info("Query succeed on:", identity); + logger.info("Query succeed on: {}", identity); return Pair.newPair(identity, null); } } diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java index 113d530..977c916 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NExecAndComp.java @@ -59,8 +59,9 @@ public class NExecAndComp { public enum CompareLevel { SAME, // exec and compare SAME_ORDER, // exec and compare order - SAME_ROWCOUNT, SUBSET, NONE, // batch execute - SAME_SQL_COMPARE + SAME_ROWCOUNT, + SUBSET, + NONE, // Do not compare and just return OK } static void execLimitAndValidate(List<Pair<String, String>> queries, String prj, String joinType) { @@ -87,7 +88,7 @@ public class NExecAndComp { Dataset<Row> sparkResult = queryWithSpark(prj, sql, query.getFirst()); List<Row> kylinRows = SparkQueryTest.castDataType(kylinResult, sparkResult).toJavaRDD().collect(); List<Row> sparkRows = sparkResult.toJavaRDD().collect(); - if (!compareResults(normRows(sparkRows), normRows(kylinRows), CompareLevel.SUBSET)) { + if (compareResults(normRows(sparkRows), normRows(kylinRows), CompareLevel.SUBSET)) { throw new IllegalArgumentException("Result not match"); } } @@ -118,6 +119,7 @@ public class NExecAndComp { Dataset<Row> cubeResult = (recAndQueryResult == null) ? queryWithKylin(prj, joinType, Pair.newPair(sql, sql)) : queryWithKylin(prj, joinType, Pair.newPair(sql, sql), recAndQueryResult); addQueryPath(recAndQueryResult, query, sql); + if (compareLevel == CompareLevel.SAME) { Dataset<Row> sparkResult = queryWithSpark(prj, sql, query.getFirst()); String result = SparkQueryTest.checkAnswer(SparkQueryTest.castDataType(cubeResult, sparkResult), sparkResult, false); @@ -125,23 +127,25 @@ public class NExecAndComp { logger.error("Failed on compare query ({}) :{}", joinType, query); logger.error(result); throw new IllegalArgumentException("query (" + joinType + ") :" + query + " result not match"); + } else { + logger.debug("Passed {}", query.getFirst()); } } else if (compareLevel == CompareLevel.NONE) { Dataset<Row> sparkResult = queryWithSpark(prj, sql, query.getFirst()); List<Row> sparkRows = sparkResult.toJavaRDD().collect(); List<Row> kylinRows = SparkQueryTest.castDataType(cubeResult, sparkResult).toJavaRDD().collect(); - if (!compareResults(normRows(sparkRows), normRows(kylinRows), compareLevel)) { + if (compareResults(normRows(sparkRows), normRows(kylinRows), compareLevel)) { logger.error("Failed on compare query ({}) :{}", joinType, query); throw new IllegalArgumentException("query (" + joinType + ") :" + query + " result not match"); } } else { cubeResult.persist(); - System.out.println( - "result comparision is not available, part of the cube results: " + cubeResult.count()); - cubeResult.show(); + logger.info( + "result comparision is not available for {}, part of the cube results: {},\n {}" , query.getFirst(), + cubeResult.count(), cubeResult.showString(20, 25 , false)); cubeResult.unpersist(); } - logger.info("The query ({}) : {} cost {} (ms)", joinType, query, System.currentTimeMillis() - startTime); + logger.trace("The query ({}) : {} cost {} (ms)", query.getFirst(), "", System.currentTimeMillis() - startTime); } } @@ -286,7 +290,7 @@ public class NExecAndComp { return ret; } - public static boolean compareResults(List<Row> expectedResult, List<Row> actualResult, CompareLevel compareLevel) { + private static boolean compareResults(List<Row> expectedResult, List<Row> actualResult, CompareLevel compareLevel) { boolean good = true; if (compareLevel == CompareLevel.SAME_ORDER) { good = expectedResult.equals(actualResult); @@ -329,7 +333,7 @@ public class NExecAndComp { printRows("expected", expectedResult); printRows("actual", actualResult); } - return good; + return !good; } private static void printRows(String source, List<Row> rows) { @@ -391,15 +395,15 @@ public class NExecAndComp { } public static Dataset<Row> queryFromCube(String prj, String sqlText) { - sqlText = QueryUtil.massageSql(sqlText, prj, 0, 0, "DEFAULT", true); + sqlText = QueryUtil.massageSql(sqlText, prj, 0, 0, "DEFAULT"); return sql(prj, sqlText, null); } public static Dataset<Row> querySparkSql(String sqlText) { - logger.info("Fallback this sql to original engine..."); + logger.trace("Fallback this sql to original engine..."); long startTs = System.currentTimeMillis(); Dataset<Row> r = KylinSparkEnv.getSparkSession().sql(sqlText); - logger.info("Duration(ms): {}", (System.currentTimeMillis() - startTs)); + logger.trace("Duration(ms): {}", (System.currentTimeMillis() - startTs)); return r; } @@ -412,11 +416,11 @@ public class NExecAndComp { throw new RuntimeException("Sorry your SQL is null..."); try { - logger.info("Try to query from cube...."); + logger.trace("Try to query from cube...."); long startTs = System.currentTimeMillis(); Dataset<Row> dataset = queryCubeAndSkipCompute(prj, sqlText, parameters); - logger.info("Cool! This sql hits cube..."); - logger.info("Duration(ms): {}", (System.currentTimeMillis() - startTs)); + logger.trace("Cool! This sql hits cube..."); + logger.trace("Duration(ms): {}", (System.currentTimeMillis() - startTs)); return dataset; } catch (Throwable e) { logger.error("There is no cube can be used for query [{}]", sqlText); @@ -523,7 +527,7 @@ public class NExecAndComp { String ret = StringUtils.join(tokens, " "); ret = ret.replaceAll(specialStr, System.getProperty("line.separator")); - logger.info("The actual sql executed is: " + ret); + logger.trace("The actual sql executed is: " + ret); return ret; } diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java index c144e4a..0858b4a 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/NManualBuildAndQueryCuboidTest.java @@ -53,9 +53,9 @@ import java.util.List; import java.util.Map; import java.util.Set; -public class NManualBuildAndQueryCuboidTest extends NManualBuildAndQueryTest { +public class NManualBuildAndQueryCuboidTest extends NBuildAndQueryTest { - private static final Logger logger = LoggerFactory.getLogger(NManualBuildAndQueryTest.class); + private static final Logger logger = LoggerFactory.getLogger(NManualBuildAndQueryCuboidTest.class); private static final String DEFAULT_PROJECT = "default"; private static StructType OUT_SCHEMA = null; diff --git a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java index ba17302..f2dfa2a 100644 --- a/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java +++ b/kylin-spark-project/kylin-spark-test/src/test/java/org/apache/kylin/engine/spark2/utils/QueryUtil.java @@ -40,48 +40,44 @@ public class QueryUtil { static List<IPushDownConverter> pushDownConverters = Collections.emptyList(); - public static String massageSql(String sql, String project, int limit, int offset, String defaultSchema, boolean isCCNeeded) { + public static String massageSql(String sql, String project, int limit, int offset, String defaultSchema) { ProjectManager projectManager = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv()); ProjectInstance projectInstance = projectManager.getProject(project); KylinConfig kylinConfig = projectInstance.getConfig(); - return massageSql(kylinConfig, sql, project, limit, offset, defaultSchema, isCCNeeded); + return massageSql(kylinConfig, sql, project, limit, offset, defaultSchema); } static String massageSql(KylinConfig kylinConfig, String sql, String project, int limit, int offset, - String defaultSchema, boolean isCCNeeded) { + String defaultSchema) { String massagedSql = normalMassageSql(kylinConfig, sql, limit, offset); - massagedSql = transformSql(kylinConfig, massagedSql, project, defaultSchema, isCCNeeded); - logger.info("SQL massage result: {}", massagedSql); + massagedSql = transformSql(kylinConfig, massagedSql, project, defaultSchema); + logger.trace("SQL massage result: {}", massagedSql); return massagedSql; } - private static String transformSql(KylinConfig kylinConfig, String sql, String project, String defaultSchema, boolean isCCNeeded) { + private static String transformSql(KylinConfig kylinConfig, String sql, String project, String defaultSchema) { // customizable SQL transformation - initQueryTransformersIfNeeded(kylinConfig, isCCNeeded); + initQueryTransformersIfNeeded(kylinConfig); for (IQueryTransformer t : queryTransformers) { sql = t.transform(sql, project, defaultSchema); - logger.debug("SQL transformed by {}, result: {}", t.getClass(), sql); + logger.trace("SQL transformed by {}, result: {}", t.getClass(), sql); } return sql; } - static void initQueryTransformersIfNeeded(KylinConfig kylinConfig, boolean isCCNeeded) { + static void initQueryTransformersIfNeeded(KylinConfig kylinConfig) { String[] currentTransformers = queryTransformers.stream().map(Object::getClass).map(Class::getCanonicalName) .toArray(String[]::new); String[] configTransformers = kylinConfig.getQueryTransformers(); boolean transformersEqual = Objects.deepEquals(currentTransformers, configTransformers); - if (transformersEqual && (isCCNeeded)) { + if (transformersEqual) { return; } List<IQueryTransformer> transformers = Lists.newArrayList(); for (String clz : configTransformers) { - if (!isCCNeeded) - continue; - try { IQueryTransformer t = (IQueryTransformer) ClassUtil.newInstance(clz); - transformers.add(t); } catch (Exception e) { throw new IllegalStateException("Failed to init query transformer", e); diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java index 7d30300..455dc81 100644 --- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java +++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java @@ -48,7 +48,7 @@ public abstract class RoutingRule { String before = getPrintableText(candidates); rule.apply(candidates); String after = getPrintableText(candidates); - logger.info("Applying rule: " + rule + ", realizations before: " + before + ", realizations after: " + after); + logger.debug("Applying rule: " + rule + ", realizations before: " + before + ", realizations after: " + after); } }