[1/3] kylin git commit: KYLIN-1677 Distribute source data by certain columns when creating flat table
Repository: kylin Updated Branches: refs/heads/master 6c32fd6a2 -> 185b34ff6 KYLIN-1677 Distribute source data by certain columns when creating flat table Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/185b34ff Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/185b34ff Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/185b34ff Branch: refs/heads/master Commit: 185b34ff6e13d81a24a86893dbf85563ae3430a3 Parents: bdf5e47 Author: shaofengshiAuthored: Tue May 17 18:29:59 2016 +0800 Committer: shaofengshi Committed: Wed May 18 10:38:32 2016 +0800 -- .../org/apache/kylin/cube/model/CubeDesc.java | 14 ++ .../org/apache/kylin/job/JoinedFlatTable.java | 26 ++- .../kylin/job/constant/ExecutableConstants.java | 2 +- .../apache/kylin/job/JoinedFlatTableTest.java | 4 +- .../source/hive/CreateFlatHiveTableStep.java| 115 .../apache/kylin/source/hive/HiveMRInput.java | 182 --- 6 files changed, 187 insertions(+), 156 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/185b34ff/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java -- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 33466b8..6dd1c1d 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -35,6 +35,7 @@ import java.util.TreeSet; import javax.annotation.Nullable; +import com.google.common.collect.Sets; import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.ArrayUtils; @@ -1015,6 +1016,19 @@ public class CubeDesc extends RootPersistentEntity { return result; } +/** + * Get a column which can be used in distributing the source table + * @return + */ +public TblColRef getDistributedByColumn() { +Set shardBy = getShardByColumns(); +if (shardBy != null && shardBy.size() > 0) { +return shardBy.iterator().next(); +} + +return null; +} + public static CubeDesc getCopyOf(CubeDesc cubeDesc) { CubeDesc newCubeDesc = new CubeDesc(); newCubeDesc.setName(cubeDesc.getName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/185b34ff/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 6ae8110..d625ad7 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -141,11 +141,17 @@ public class JoinedFlatTable { } appendJoinStatement(intermediateTableDesc, sql, tableAliasMap); appendWhereStatement(intermediateTableDesc, sql, tableAliasMap); +appendDistributeStatement(intermediateTableDesc, sql, tableAliasMap); return sql.toString(); } -public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) { -return "INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n"; +public static String generateCountDataStatement(IJoinedFlatTableDesc intermediateTableDesc, final String outputDir) { +final Map tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel()); +final StringBuilder sql = new StringBuilder(); +final String factTbl = intermediateTableDesc.getDataModel().getFactTable(); +sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) from " + factTbl + " " + tableAliasMap.get(factTbl) + "\n"); +appendWhereStatement(intermediateTableDesc, sql, tableAliasMap); +return sql.toString(); } private static Map buildTableAliasMap(DataModelDesc dataModelDesc) { @@ -211,6 +217,22 @@ public class JoinedFlatTable { } } +private static void appendDistributeStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map tableAliasMap) { +if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) { +return;//TODO: for now only cube segments support distribution +} +CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc; + +TblColRef
kylin git commit: KYLIN-1677 Distribute source data by certain columns when creating flat table
Repository: kylin Updated Branches: refs/heads/KYLIN-1677 [created] 73894cdda KYLIN-1677 Distribute source data by certain columns when creating flat table Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/73894cdd Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/73894cdd Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/73894cdd Branch: refs/heads/KYLIN-1677 Commit: 73894cdda83de817526a7557237dec7b664e097a Parents: 71cf7c8 Author: shaofengshiAuthored: Tue May 17 18:29:59 2016 +0800 Committer: shaofengshi Committed: Tue May 17 18:30:18 2016 +0800 -- .../org/apache/kylin/cube/model/CubeDesc.java | 14 ++ .../org/apache/kylin/job/JoinedFlatTable.java | 26 ++- .../kylin/job/constant/ExecutableConstants.java | 2 +- .../apache/kylin/job/JoinedFlatTableTest.java | 4 +- .../source/hive/CreateFlatHiveTableStep.java| 115 .../apache/kylin/source/hive/HiveMRInput.java | 182 --- 6 files changed, 187 insertions(+), 156 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java -- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 7b6e4f7..0b06ccb 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -35,6 +35,7 @@ import java.util.TreeSet; import javax.annotation.Nullable; +import com.google.common.collect.Sets; import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.ArrayUtils; @@ -1028,6 +1029,19 @@ public class CubeDesc extends RootPersistentEntity { return result; } +/** + * Get a column which can be used in distributing the source table + * @return + */ +public TblColRef getDistributedByColumn() { +Set shardBy = getShardByColumns(); +if (shardBy != null && shardBy.size() > 0) { +return shardBy.iterator().next(); +} + +return null; +} + public static CubeDesc getCopyOf(CubeDesc cubeDesc) { CubeDesc newCubeDesc = new CubeDesc(); newCubeDesc.setName(cubeDesc.getName()); http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java index 6ae8110..d625ad7 100644 --- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java +++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java @@ -141,11 +141,17 @@ public class JoinedFlatTable { } appendJoinStatement(intermediateTableDesc, sql, tableAliasMap); appendWhereStatement(intermediateTableDesc, sql, tableAliasMap); +appendDistributeStatement(intermediateTableDesc, sql, tableAliasMap); return sql.toString(); } -public static String generateSelectRowCountStatement(IJoinedFlatTableDesc intermediateTableDesc, String outputDir) { -return "INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) FROM " + intermediateTableDesc.getTableName() + ";\n"; +public static String generateCountDataStatement(IJoinedFlatTableDesc intermediateTableDesc, final String outputDir) { +final Map tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel()); +final StringBuilder sql = new StringBuilder(); +final String factTbl = intermediateTableDesc.getDataModel().getFactTable(); +sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) from " + factTbl + " " + tableAliasMap.get(factTbl) + "\n"); +appendWhereStatement(intermediateTableDesc, sql, tableAliasMap); +return sql.toString(); } private static Map buildTableAliasMap(DataModelDesc dataModelDesc) { @@ -211,6 +217,22 @@ public class JoinedFlatTable { } } +private static void appendDistributeStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map tableAliasMap) { +if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) { +return;//TODO: for now only cube segments support distribution +} +CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc; + +TblColRef