[1/3] kylin git commit: KYLIN-1677 Distribute source data by certain columns when creating flat table

2016-05-17 Thread shaofengshi
Repository: kylin
Updated Branches:
  refs/heads/master 6c32fd6a2 -> 185b34ff6


KYLIN-1677 Distribute source data by certain columns when creating flat table

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/185b34ff
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/185b34ff
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/185b34ff

Branch: refs/heads/master
Commit: 185b34ff6e13d81a24a86893dbf85563ae3430a3
Parents: bdf5e47
Author: shaofengshi 
Authored: Tue May 17 18:29:59 2016 +0800
Committer: shaofengshi 
Committed: Wed May 18 10:38:32 2016 +0800

--
 .../org/apache/kylin/cube/model/CubeDesc.java   |  14 ++
 .../org/apache/kylin/job/JoinedFlatTable.java   |  26 ++-
 .../kylin/job/constant/ExecutableConstants.java |   2 +-
 .../apache/kylin/job/JoinedFlatTableTest.java   |   4 +-
 .../source/hive/CreateFlatHiveTableStep.java| 115 
 .../apache/kylin/source/hive/HiveMRInput.java   | 182 ---
 6 files changed, 187 insertions(+), 156 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kylin/blob/185b34ff/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
--
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 33466b8..6dd1c1d 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -35,6 +35,7 @@ import java.util.TreeSet;
 
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.ArrayUtils;
@@ -1015,6 +1016,19 @@ public class CubeDesc extends RootPersistentEntity {
 return result;
 }
 
+/**
+ * Get a column which can be used in distributing the source table
+ * @return
+ */
+public TblColRef getDistributedByColumn() {
+Set shardBy = getShardByColumns();
+if (shardBy != null && shardBy.size() > 0) {
+return shardBy.iterator().next();
+}
+
+return null;
+}
+
 public static CubeDesc getCopyOf(CubeDesc cubeDesc) {
 CubeDesc newCubeDesc = new CubeDesc();
 newCubeDesc.setName(cubeDesc.getName());

http://git-wip-us.apache.org/repos/asf/kylin/blob/185b34ff/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
--
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java 
b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 6ae8110..d625ad7 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -141,11 +141,17 @@ public class JoinedFlatTable {
 }
 appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
 appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+appendDistributeStatement(intermediateTableDesc, sql, tableAliasMap);
 return sql.toString();
 }
 
-public static String generateSelectRowCountStatement(IJoinedFlatTableDesc 
intermediateTableDesc, String outputDir) {
-return "INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) 
FROM " + intermediateTableDesc.getTableName() + ";\n";
+public static String generateCountDataStatement(IJoinedFlatTableDesc 
intermediateTableDesc, final String outputDir) {
+final Map tableAliasMap = 
buildTableAliasMap(intermediateTableDesc.getDataModel());
+final StringBuilder sql = new StringBuilder();
+final String factTbl = 
intermediateTableDesc.getDataModel().getFactTable();
+sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT 
count(*) from " + factTbl + " " + tableAliasMap.get(factTbl) + "\n");
+appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+return sql.toString();
 }
 
 private static Map buildTableAliasMap(DataModelDesc 
dataModelDesc) {
@@ -211,6 +217,22 @@ public class JoinedFlatTable {
 }
 }
 
+private static void appendDistributeStatement(IJoinedFlatTableDesc 
intermediateTableDesc, StringBuilder sql, Map tableAliasMap) {
+if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
+return;//TODO: for now only cube segments support distribution
+}
+CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) 
intermediateTableDesc;
+
+TblColRef 

kylin git commit: KYLIN-1677 Distribute source data by certain columns when creating flat table

2016-05-17 Thread shaofengshi
Repository: kylin
Updated Branches:
  refs/heads/KYLIN-1677 [created] 73894cdda


KYLIN-1677 Distribute source data by certain columns when creating flat table

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/73894cdd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/73894cdd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/73894cdd

Branch: refs/heads/KYLIN-1677
Commit: 73894cdda83de817526a7557237dec7b664e097a
Parents: 71cf7c8
Author: shaofengshi 
Authored: Tue May 17 18:29:59 2016 +0800
Committer: shaofengshi 
Committed: Tue May 17 18:30:18 2016 +0800

--
 .../org/apache/kylin/cube/model/CubeDesc.java   |  14 ++
 .../org/apache/kylin/job/JoinedFlatTable.java   |  26 ++-
 .../kylin/job/constant/ExecutableConstants.java |   2 +-
 .../apache/kylin/job/JoinedFlatTableTest.java   |   4 +-
 .../source/hive/CreateFlatHiveTableStep.java| 115 
 .../apache/kylin/source/hive/HiveMRInput.java   | 182 ---
 6 files changed, 187 insertions(+), 156 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
--
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 7b6e4f7..0b06ccb 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -35,6 +35,7 @@ import java.util.TreeSet;
 
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.ArrayUtils;
@@ -1028,6 +1029,19 @@ public class CubeDesc extends RootPersistentEntity {
 return result;
 }
 
+/**
+ * Get a column which can be used in distributing the source table
+ * @return
+ */
+public TblColRef getDistributedByColumn() {
+Set shardBy = getShardByColumns();
+if (shardBy != null && shardBy.size() > 0) {
+return shardBy.iterator().next();
+}
+
+return null;
+}
+
 public static CubeDesc getCopyOf(CubeDesc cubeDesc) {
 CubeDesc newCubeDesc = new CubeDesc();
 newCubeDesc.setName(cubeDesc.getName());

http://git-wip-us.apache.org/repos/asf/kylin/blob/73894cdd/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
--
diff --git a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java 
b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
index 6ae8110..d625ad7 100644
--- a/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
@@ -141,11 +141,17 @@ public class JoinedFlatTable {
 }
 appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
 appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+appendDistributeStatement(intermediateTableDesc, sql, tableAliasMap);
 return sql.toString();
 }
 
-public static String generateSelectRowCountStatement(IJoinedFlatTableDesc 
intermediateTableDesc, String outputDir) {
-return "INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT count(*) 
FROM " + intermediateTableDesc.getTableName() + ";\n";
+public static String generateCountDataStatement(IJoinedFlatTableDesc 
intermediateTableDesc, final String outputDir) {
+final Map tableAliasMap = 
buildTableAliasMap(intermediateTableDesc.getDataModel());
+final StringBuilder sql = new StringBuilder();
+final String factTbl = 
intermediateTableDesc.getDataModel().getFactTable();
+sql.append("INSERT OVERWRITE DIRECTORY '" + outputDir + "' SELECT 
count(*) from " + factTbl + " " + tableAliasMap.get(factTbl) + "\n");
+appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
+return sql.toString();
 }
 
 private static Map buildTableAliasMap(DataModelDesc 
dataModelDesc) {
@@ -211,6 +217,22 @@ public class JoinedFlatTable {
 }
 }
 
+private static void appendDistributeStatement(IJoinedFlatTableDesc 
intermediateTableDesc, StringBuilder sql, Map tableAliasMap) {
+if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
+return;//TODO: for now only cube segments support distribution
+}
+CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) 
intermediateTableDesc;
+
+TblColRef