This is an automated email from the ASF dual-hosted git repository. shaofengshi pushed a commit to branch master-hadoop3.1 in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 766a472bc294e06ac2f13c5437bda349ff3aa480 Author: shaofengshi <shaofeng...@apache.org> AuthorDate: Mon Sep 10 22:44:58 2018 +0800 KYLIN-2565 Resolve conflict --- pom.xml | 786 +-------------------- .../org/apache/kylin/rest/security/MockHTable.java | 745 ------------------- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 26 +- .../kylin/storage/hbase/steps/CubeHFileJob.java | 6 +- .../storage/hbase/steps/HFileOutputFormat3.java | 673 ------------------ .../storage/hbase/steps/CubeHFileMapperTest.java | 24 +- 6 files changed, 58 insertions(+), 2202 deletions(-) diff --git a/pom.xml b/pom.xml index 797188d..c253753 100644 --- a/pom.xml +++ b/pom.xml @@ -61,7 +61,11 @@ <!-- Spark versions --> <spark.version>2.1.2</spark.version> <kryo.version>4.0.0</kryo.version> + <!-- mysql versions --> + <mysql-connector.version>5.1.8</mysql-connector.version> + <!-- Scala versions --> + <scala.version>2.11.0</scala.version> <commons-configuration.version>1.10</commons-configuration.version> <!-- <reflections.version>0.9.10</reflections.version> --> @@ -127,6 +131,8 @@ <spring.framework.security.extensions.version>1.0.2.RELEASE</spring.framework.security.extensions.version> <opensaml.version>2.6.6</opensaml.version> <aspectj.version>1.8.9</aspectj.version> + <!-- API forbidden --> + <forbiddenapis.version>2.3</forbiddenapis.version> <!-- Sonar --> <sonar.java.coveragePlugin>jacoco</sonar.java.coveragePlugin> @@ -558,6 +564,13 @@ <version>${hbase-hadoop2.version}</version> <scope>test</scope> </dependency> + <!-- jdbc dependencies --> + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>${mysql-connector.version}</version> + <scope>provided</scope> + </dependency> <!-- Hive dependencies --> <dependency> <groupId>org.apache.hive</groupId> @@ -918,764 +931,27 @@ <version>${tomcat.version}</version> <scope>provided</scope> </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${scala.version}</version> + </dependency> + + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + </dependency> </dependencies> </dependencyManagement> ->>>>>>> KYLIN-2565, upgrade to Hadoop3.0 - <dependencies> - <!-- Kylin --> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>atopcalcite</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-common</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-metrics</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-metrics-reporter-hive</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-metrics-reporter-kafka</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-metadata</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-dictionary</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-cube</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-job</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-storage</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-engine-mr</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-engine-spark</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-source-hive</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-source-kafka</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-storage-hbase</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-query</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-server-base</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-server</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-jdbc</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-assembly</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-tool</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-tool-assembly</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-it</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-common</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-storage</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-storage-hbase</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-server-base</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-job</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-core-cube</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.kylin</groupId> - <artifactId>kylin-assembly</artifactId> - <version>${project.version}</version> - <type>test-jar</type> - </dependency> - - <!-- Hadoop2 dependencies --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - <version>${hadoop2.version}</version> - <scope>provided</scope> - <exclusions> - <exclusion> - <groupId>javax.servlet</groupId> - <artifactId>servlet-api</artifactId> - </exclusion> - <exclusion> - <groupId>net.java.dev.jets3t</groupId> - <artifactId>jets3t</artifactId> - </exclusion> - <exclusion> - <groupId>javax.servlet.jsp</groupId> - <artifactId>jsp-api</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-hdfs</artifactId> - <version>${hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-app</artifactId> - <version>${hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-api</artifactId> - <version>${hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-common</artifactId> - <version>${hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - <version>${hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-jobclient</artifactId> - <version>${hadoop2.version}</version> - <type>test-jar</type> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-annotations</artifactId> - <version>${hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-auth</artifactId> - <version>${hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-minicluster</artifactId> - <version>${hadoop2.version}</version> - <optional>true</optional> - </dependency> - <dependency> - <groupId>org.apache.zookeeper</groupId> - <artifactId>zookeeper</artifactId> - <version>${zookeeper.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-framework</artifactId> - <version>${curator.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-recipes</artifactId> - <version>${curator.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-client</artifactId> - <version>${curator.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - <version>${jsr305.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.google.guava</groupId> - <artifactId>guava</artifactId> - <version>${guava.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.jcraft</groupId> - <artifactId>jsch</artifactId> - <version>${jsch.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>commons-cli</groupId> - <artifactId>commons-cli</artifactId> - <version>${commons-cli.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>commons-lang</groupId> - <artifactId>commons-lang</artifactId> - <version>${commons-lang.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-math3</artifactId> - <version>${commons-math3.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>commons-io</groupId> - <artifactId>commons-io</artifactId> - <version>${commons-io.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>commons-fileupload</groupId> - <artifactId>commons-fileupload</artifactId> - <version>${commons-upload.version}</version> - </dependency> - <dependency> - <groupId>commons-collections</groupId> - <artifactId>commons-collections</artifactId> - <version>${commons-collections.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-pool2</artifactId> - <version>${commons-pool.version}</version> - </dependency> - - <!-- HBase2 dependencies --> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-hadoop2-compat</artifactId> - <version>${hbase-hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-common</artifactId> - <version>${hbase-hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-client</artifactId> - <version>${hbase-hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-server</artifactId> - <version>${hbase-hadoop2.version}</version> - </dependency> - <dependency> - <groupId>org.apache.mrunit</groupId> - <artifactId>mrunit</artifactId> - <version>${mrunit.version}</version> - <classifier>hadoop2</classifier> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> - <artifactId>hbase-testing-util</artifactId> - <version>${hbase-hadoop2.version}</version> - <scope>test</scope> - </dependency> - <!-- jdbc dependencies --> - <dependency> - <groupId>mysql</groupId> - <artifactId>mysql-connector-java</artifactId> - <version>${mysql-connector.version}</version> - <scope>provided</scope> - </dependency> - <!-- Hive dependencies --> - <dependency> - <groupId>org.apache.hive</groupId> - <artifactId>hive-jdbc</artifactId> - <version>${hive.version}</version> - </dependency> - <dependency> - <groupId>org.apache.hive.hcatalog</groupId> - <artifactId>hive-hcatalog-core</artifactId> - <version>${hive-hcatalog.version}</version> - </dependency> - <!-- Yarn dependencies --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-resourcemanager</artifactId> - <version>${yarn.version}</version> - </dependency> - - <!-- Calcite dependencies --> - <dependency> - <groupId>org.apache.calcite</groupId> - <artifactId>calcite-core</artifactId> - <version>${calcite.version}</version> - <exclusions> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.calcite</groupId> - <artifactId>calcite-linq4j</artifactId> - <version>${calcite.version}</version> - </dependency> - <dependency> - <groupId>org.apache.calcite.avatica</groupId> - <artifactId>avatica-core</artifactId> - <version>${avatica.version}</version> - </dependency> - <dependency> - <groupId>org.apache.calcite.avatica</groupId> - <artifactId>avatica</artifactId> - <version>${avatica.version}</version> - </dependency> - <!-- Workaround for hive 0.14 avatica dependency --> - <dependency> - <groupId>org.apache.calcite</groupId> - <artifactId>calcite-avatica</artifactId> - <version>1.6.0</version> - <exclusions> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-core</artifactId> - <version>${jackson.version}</version> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-databind</artifactId> - <version>${jackson.version}</version> - </dependency> - <dependency> - <groupId>com.fasterxml.jackson.core</groupId> - <artifactId>jackson-annotations</artifactId> - <version>${jackson.version}</version> - </dependency> - - <!-- Spark dependency --> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-core_2.11</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-sql_2.11</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.spark</groupId> - <artifactId>spark-hive_2.11</artifactId> - <version>${spark.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>com.esotericsoftware</groupId> - <artifactId>kryo-shaded</artifactId> - <version>${kryo.version}</version> - <scope>provided</scope> - </dependency> - <!-- Kafka dependency --> - <dependency> - <groupId>org.apache.kafka</groupId> - <artifactId>kafka_2.11</artifactId> - <version>${kafka.version}</version> - <scope>provided</scope> - </dependency> - - <!-- Other dependencies --> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> - <version>${commons-lang3.version}</version> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-email</artifactId> - <version>${commons-email.version}</version> - </dependency> - <dependency> - <groupId>commons-validator</groupId> - <artifactId>commons-validator</artifactId> - <version>${commons-validator.version}</version> - </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - <version>${commons-compress}</version> - </dependency> - <dependency> - <groupId>org.freemarker</groupId> - <artifactId>freemarker</artifactId> - <version>${freemarker.version}</version> - </dependency> - <dependency> - <groupId>org.rocksdb</groupId> - <artifactId>rocksdbjni</artifactId> - <version>${rocksdb.version}</version> - </dependency> - - <!-- Logging --> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <version>${log4j.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <version>${slf4j.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>jcl-over-slf4j</artifactId> - <version>${slf4j.version}</version> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - <version>${slf4j.version}</version> - </dependency> - - <!-- Metrics --> - <dependency> - <groupId>io.dropwizard.metrics</groupId> - <artifactId>metrics-core</artifactId> - <version>${dropwizard.version}</version> - </dependency> - - <!-- Test --> - <dependency> - <groupId>junit</groupId> - <artifactId>junit</artifactId> - <version>${junit.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.dbunit</groupId> - <artifactId>dbunit</artifactId> - <version>${dbunit.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.maven</groupId> - <artifactId>maven-model</artifactId> - <version>${maven-model.version}</version> - </dependency> - <dependency> - <groupId>com.h2database</groupId> - <artifactId>h2</artifactId> - <version>${h2.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>xerces</groupId> - <artifactId>xercesImpl</artifactId> - <version>${xerces.version}</version> - </dependency> - <dependency> - <groupId>xalan</groupId> - <artifactId>xalan</artifactId> - <version>${xalan.version}</version> - </dependency> - <dependency> - <groupId>com.github.jbellis</groupId> - <artifactId>jamm</artifactId> - <version>${jamm.version}</version> - </dependency> - <dependency> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - <version>${apache-httpclient.version}</version> - </dependency> - <dependency> - <groupId>org.roaringbitmap</groupId> - <artifactId>RoaringBitmap</artifactId> - <version>${roaring.version}</version> - </dependency> - <dependency> - <groupId>com.tdunning</groupId> - <artifactId>t-digest</artifactId> - <version>${t-digest.version}</version> - </dependency> - <dependency> - <groupId>cglib</groupId> - <artifactId>cglib</artifactId> - <version>${cglib.version}</version> - </dependency> - <dependency> - <groupId>net.sf.supercsv</groupId> - <artifactId>super-csv</artifactId> - <version>${supercsv.version}</version> - </dependency> - <dependency> - <groupId>org.aspectj</groupId> - <artifactId>aspectjrt</artifactId> - <version>${aspectj.version}</version> - </dependency> - <dependency> - <groupId>org.aspectj</groupId> - <artifactId>aspectjweaver</artifactId> - <version>${aspectj.version}</version> - </dependency> - <dependency> - <groupId>com.thetransactioncompany</groupId> - <artifactId>cors-filter</artifactId> - <version>${cors.version}</version> - </dependency> - <dependency> - <groupId>net.sf.ehcache</groupId> - <artifactId>ehcache</artifactId> - <version>${ehcache.version}</version> - </dependency> - <dependency> - <groupId>org.opensaml</groupId> - <artifactId>opensaml</artifactId> - <version>${opensaml.version}</version> - </dependency> - - - <!-- Spring Core --> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-webmvc</artifactId> - <version>${spring.framework.version}</version> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-jdbc</artifactId> - <version>${spring.framework.version}</version> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-aop</artifactId> - <version>${spring.framework.version}</version> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-context-support</artifactId> - <version>${spring.framework.version}</version> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-test</artifactId> - <version>${spring.framework.version}</version> - </dependency> - <!-- Spring Security --> - <dependency> - <groupId>org.springframework.security</groupId> - <artifactId>spring-security-acl</artifactId> - <version>${spring.framework.security.version}</version> - </dependency> - <dependency> - <groupId>org.springframework.security</groupId> - <artifactId>spring-security-config</artifactId> - <version>${spring.framework.security.version}</version> - </dependency> - <dependency> - <groupId>org.springframework.security</groupId> - <artifactId>spring-security-core</artifactId> - <version>${spring.framework.security.version}</version> - </dependency> - <dependency> - <groupId>org.springframework.security</groupId> - <artifactId>spring-security-ldap</artifactId> - <version>${spring.framework.security.version}</version> - </dependency> - <dependency> - <groupId>org.springframework.security</groupId> - <artifactId>spring-security-web</artifactId> - <version>${spring.framework.security.version}</version> - </dependency> - <dependency> - <groupId>org.springframework.security.extensions</groupId> - <artifactId>spring-security-saml2-core</artifactId> - <version>${spring.framework.security.extensions.version}</version> - </dependency> - - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-server</artifactId> - <version>${jetty.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.eclipse.jetty</groupId> - <artifactId>jetty-webapp</artifactId> - <version>${jetty.version}</version> - <scope>test</scope> - </dependency> - <dependency> - <groupId>org.apache.tomcat</groupId> - <artifactId>tomcat-catalina</artifactId> - <version>${tomcat.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.tomcat</groupId> - <artifactId>tomcat-jasper</artifactId> - <version>${tomcat.version}</version> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.tomcat.embed</groupId> - <artifactId>tomcat-embed-core</artifactId> - <version>${tomcat.version}</version> - <scope>provided</scope> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-library</artifactId> - <version>${scala.version}</version> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-compiler</artifactId> - <version>${scala.version}</version> - </dependency> - - <dependency> - <groupId>org.scala-lang</groupId> - <artifactId>scala-reflect</artifactId> - <version>${scala.version}</version> - </dependency> - </dependencies> - </dependencyManagement> - - <dependencies> - - <!-- the logging dependencies are inherited by all modules for their generality - log4j and slf4j-log4j12 test scope only for UT/IT use --> - <dependency> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>jcl-over-slf4j</artifactId> - </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> - <!--for transitive dependencies like commons-collectinos, commons-lang --> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </dependency> - </dependencies> - <repositories> <repository> <id>central</id> diff --git a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java b/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java deleted file mode 100644 index 38c65a8..0000000 --- a/server-base/src/main/java/org/apache/kylin/rest/security/MockHTable.java +++ /dev/null @@ -1,745 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This file is licensed to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the - * License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -package org.apache.kylin.rest.security; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.NoSuchElementException; -import java.util.TreeMap; - -import org.apache.commons.lang.NotImplementedException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Append; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Increment; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; -import org.apache.hadoop.hbase.client.RowMutations; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.Filter; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; -import org.apache.hadoop.hbase.util.Bytes; - -import com.google.protobuf.Descriptors; -import com.google.protobuf.Message; -import com.google.protobuf.Service; -import com.google.protobuf.ServiceException; - -/** - * MockHTable. - * - * original MockHTable (by agaoglu) : https://gist.github.com/agaoglu/613217#file_mock_h_table.java - * - * Modifications - * - * <ul> - * <li>fix filter (by k-mack) : https://gist.github.com/k-mack/4600133</li> - * <li>fix batch() : implement all mutate operation and fix result[] count.</li> - * <li>fix exists()</li> - * <li>fix increment() : wrong return value</li> - * <li>check columnFamily</li> - * <li>implement mutateRow()</li> - * <li>implement getTableName()</li> - * <li>implement getTableDescriptor()</li> - * <li>throws RuntimeException when unimplemented method was called.</li> - * <li>remove some methods for loading data, checking values ...</li> - * </ul> - */ -public class MockHTable implements Table { - private final String tableName; - private final List<String> columnFamilies = new ArrayList<>(); - - private NavigableMap<byte[], NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>> data = new TreeMap<>( - Bytes.BYTES_COMPARATOR); - - private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, int maxVersions) { - return toKeyValue(row, rowdata, 0, Long.MAX_VALUE, maxVersions); - } - - public MockHTable(String tableName) { - this.tableName = tableName; - } - - public MockHTable(String tableName, String... columnFamilies) { - this.tableName = tableName; - this.columnFamilies.addAll(Arrays.asList(columnFamilies)); - } - - public void addColumnFamily(String columnFamily) { - this.columnFamilies.add(columnFamily); - } - - @Override - public TableName getName() { - return null; - } - - /** - * {@inheritDoc} - */ - @Override - public Configuration getConfiguration() { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); - } - - /** - * {@inheritDoc} - */ - @Override - public HTableDescriptor getTableDescriptor() throws IOException { - HTableDescriptor table = new HTableDescriptor(tableName); - for (String columnFamily : columnFamilies) { - table.addFamily(new HColumnDescriptor(columnFamily)); - } - return table; - } - - /** - * {@inheritDoc} - */ - @Override - public void mutateRow(RowMutations rm) throws IOException { - // currently only support Put and Delete - for (Mutation mutation : rm.getMutations()) { - if (mutation instanceof Put) { - put((Put) mutation); - } else if (mutation instanceof Delete) { - delete((Delete) mutation); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public Result append(Append append) throws IOException { - throw new RuntimeException(this.getClass() + " does NOT implement this method."); - } - - private static List<Cell> toKeyValue(byte[] row, NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowdata, long timestampStart, long timestampEnd, int maxVersions) { - List<Cell> ret = new ArrayList<>(); - for (byte[] family : rowdata.keySet()) - for (byte[] qualifier : rowdata.get(family).keySet()) { - int versionsAdded = 0; - for (Map.Entry<Long, byte[]> tsToVal : rowdata.get(family).get(qualifier).descendingMap().entrySet()) { - if (versionsAdded++ == maxVersions) - break; - Long timestamp = tsToVal.getKey(); - if (timestamp < timestampStart) - continue; - if (timestamp > timestampEnd) - continue; - byte[] value = tsToVal.getValue(); - ret.add(new KeyValue(row, family, qualifier, timestamp, value)); - } - } - return ret; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean exists(Get get) throws IOException { - Result result = get(get); - return result != null && result.isEmpty() == false; - } - - @Override - public boolean[] existsAll(List<Get> list) throws IOException { - return new boolean[0]; - } - - /** - * {@inheritDoc} - */ - @Override - public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException { - results = batch(actions); - } - - /** - * {@inheritDoc} - */ - public Object[] batch(List<? extends Row> actions) throws IOException, InterruptedException { - Object[] results = new Object[actions.size()]; // same size. - for (int i = 0; i < actions.size(); i++) { - Row r = actions.get(i); - if (r instanceof Delete) { - delete((Delete) r); - results[i] = new Result(); - } - if (r instanceof Put) { - put((Put) r); - results[i] = new Result(); - } - if (r instanceof Get) { - Result result = get((Get) r); - results[i] = result; - } - if (r instanceof Increment) { - Result result = increment((Increment) r); - results[i] = result; - } - if (r instanceof Append) { - Result result = append((Append) r); - results[i] = result; - } - } - return results; - } - - @Override - public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) - throws IOException, InterruptedException { - - } - - @Override - public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback) throws IOException, InterruptedException { - return new Object[0]; - } - - /** - * {@inheritDoc} - */ - @Override - public Result get(Get get) throws IOException { - if (!data.containsKey(get.getRow())) - return new Result(); - byte[] row = get.getRow(); - List<Cell> kvs = new ArrayList<>(); - if (!get.hasFamilies()) { - kvs = toKeyValue(row, data.get(row), get.getMaxVersions()); - } else { - for (byte[] family : get.getFamilyMap().keySet()) { - if (data.get(row).get(family) == null) - continue; - NavigableSet<byte[]> qualifiers = get.getFamilyMap().get(family); - if (qualifiers == null || qualifiers.isEmpty()) - qualifiers = data.get(row).get(family).navigableKeySet(); - for (byte[] qualifier : qualifiers) { - if (qualifier == null) - qualifier = "".getBytes(StandardCharsets.UTF_8); - if (!data.get(row).containsKey(family) || !data.get(row).get(family).containsKey(qualifier) - || data.get(row).get(family).get(qualifier).isEmpty()) - continue; - Map.Entry<Long, byte[]> timestampAndValue = data.get(row).get(family).get(qualifier).lastEntry(); - kvs.add(new KeyValue(row, family, qualifier, timestampAndValue.getKey(), - timestampAndValue.getValue())); - } - } - } - Filter filter = get.getFilter(); - if (filter != null) { - kvs = filter(filter, kvs); - } - - return Result.create(kvs); - } - - /** - * {@inheritDoc} - */ - @Override - public Result[] get(List<Get> gets) throws IOException { - List<Result> results = new ArrayList<Result>(); - for (Get g : gets) { - results.add(get(g)); - } - return results.toArray(new Result[results.size()]); - } - - /** - * {@inheritDoc} - */ - @Override - public ResultScanner getScanner(Scan scan) throws IOException { - final List<Result> ret = new ArrayList<Result>(); - byte[] st = scan.getStartRow(); - byte[] sp = scan.getStopRow(); - Filter filter = scan.getFilter(); - - for (byte[] row : data.keySet()) { - // if row is equal to startRow emit it. When startRow (inclusive) and - // stopRow (exclusive) is the same, it should not be excluded which would - // happen w/o this control. - if (st != null && st.length > 0 && Bytes.BYTES_COMPARATOR.compare(st, row) != 0) { - // if row is before startRow do not emit, pass to next row - if (st != null && st.length > 0 && Bytes.BYTES_COMPARATOR.compare(st, row) > 0) - continue; - // if row is equal to stopRow or after it do not emit, stop iteration - if (sp != null && sp.length > 0 && Bytes.BYTES_COMPARATOR.compare(sp, row) <= 0) - break; - } - - List<Cell> kvs = null; - if (!scan.hasFamilies()) { - kvs = toKeyValue(row, data.get(row), scan.getTimeRange().getMin(), scan.getTimeRange().getMax(), - scan.getMaxVersions()); - } else { - kvs = new ArrayList<>(); - for (byte[] family : scan.getFamilyMap().keySet()) { - if (data.get(row).get(family) == null) - continue; - NavigableSet<byte[]> qualifiers = scan.getFamilyMap().get(family); - if (qualifiers == null || qualifiers.isEmpty()) - qualifiers = data.get(row).get(family).navigableKeySet(); - for (byte[] qualifier : qualifiers) { - if (data.get(row).get(family).get(qualifier) == null) - continue; - for (Long timestamp : data.get(row).get(family).get(qualifier).descendingKeySet()) { - if (timestamp < scan.getTimeRange().getMin()) - continue; - if (timestamp > scan.getTimeRange().getMax()) - continue; - byte[] value = data.get(row).get(family).get(qualifier).get(timestamp); - kvs.add(new KeyValue(row, family, qualifier, timestamp, value)); - if (kvs.size() == scan.getMaxVersions()) { - break; - } - } - } - } - } - if (filter != null) { - kvs = filter(filter, kvs); - // Check for early out optimization - if (filter.filterAllRemaining()) { - break; - } - } - if (!kvs.isEmpty()) { - ret.add(Result.create(kvs)); - } - } - - return new ResultScanner() { - private final Iterator<Result> iterator = ret.iterator(); - - public Iterator<Result> iterator() { - return iterator; - } - - public Result[] next(int nbRows) throws IOException { - ArrayList<Result> resultSets = new ArrayList<Result>(nbRows); - for (int i = 0; i < nbRows; i++) { - Result next = next(); - if (next != null) { - resultSets.add(next); - } else { - break; - } - } - return resultSets.toArray(new Result[resultSets.size()]); - } - - public Result next() throws IOException { - try { - return iterator().next(); - } catch (NoSuchElementException e) { - return null; - } - } - - public void close() { - } - - @Override - public boolean renewLease() { - return false; - } - - @Override - public ScanMetrics getScanMetrics() { - return null; - } - }; - } - - /** - * Follows the logical flow through the filter methods for a single row. - * - * @param filter HBase filter. - * @param kvs List of a row's KeyValues - * @return List of KeyValues that were not filtered. - */ - private List<Cell> filter(Filter filter, List<Cell> kvs) throws IOException { - filter.reset(); - - List<Cell> tmp = new ArrayList<>(kvs.size()); - tmp.addAll(kvs); - - /* - * Note. Filter flow for a single row. Adapted from - * "HBase: The Definitive Guide" (p. 163) by Lars George, 2011. - * See Figure 4-2 on p. 163. - */ - boolean filteredOnRowKey = false; - List<Cell> nkvs = new ArrayList<>(tmp.size()); - for (Cell kv : tmp) { - if (filter.filterRowKey(kv)) { - filteredOnRowKey = true; - break; - } - Filter.ReturnCode filterResult = filter.filterKeyValue(kv); - if (filterResult == Filter.ReturnCode.INCLUDE) { - nkvs.add(kv); - } else if (filterResult == Filter.ReturnCode.NEXT_ROW) { - break; - } else if (filterResult == Filter.ReturnCode.NEXT_COL || filterResult == Filter.ReturnCode.SKIP) { - continue; - } - /* - * Ignoring next key hint which is a optimization to reduce file - * system IO - */ - } - if (filter.hasFilterRow() && !filteredOnRowKey) { - filter.filterRow(); - } - if (filter.filterRow() || filteredOnRowKey) { - nkvs.clear(); - } - tmp = nkvs; - return tmp; - } - - /** - * {@inheritDoc} - */ - @Override - public ResultScanner getScanner(byte[] family) throws IOException { - Scan scan = new Scan(); - scan.addFamily(family); - return getScanner(scan); - } - - /** - * {@inheritDoc} - */ - @Override - public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException { - Scan scan = new Scan(); - scan.addColumn(family, qualifier); - return getScanner(scan); - } - - private <K, V> V forceFind(NavigableMap<K, V> map, K key, V newObject) { - V data = map.get(key); - if (data == null) { - data = newObject; - map.put(key, data); - } - return data; - } - - /** - * {@inheritDoc} - */ - @Override - public void put(Put put) throws IOException { - byte[] row = put.getRow(); - NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> rowData = forceFind(data, row, new TreeMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>>(Bytes.BYTES_COMPARATOR)); - for (byte[] family : put.getFamilyCellMap().keySet()) { - if (columnFamilies.contains(new String(family, StandardCharsets.UTF_8)) == false) { - throw new RuntimeException("Not Exists columnFamily : " + new String(family)); - } - NavigableMap<byte[], NavigableMap<Long, byte[]>> familyData = forceFind(rowData, family, new TreeMap<byte[], NavigableMap<Long, byte[]>>(Bytes.BYTES_COMPARATOR)); - for (Cell kv : put.getFamilyCellMap().get(family)) { - CellUtil.updateLatestStamp(kv, System.currentTimeMillis()); - byte[] qualifier = kv.getQualifierArray(); - NavigableMap<Long, byte[]> qualifierData = forceFind(familyData, qualifier, new TreeMap<Long, byte[]>()); - qualifierData.put(kv.getTimestamp(), kv.getValueArray()); - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void put(List<Put> puts) throws IOException { - for (Put put : puts) { - put(put); - } - - } - - private boolean check(byte[] row, byte[] family, byte[] qualifier, byte[] value) { - if (value == null || value.length == 0) - return !data.containsKey(row) || !data.get(row).containsKey(family) - || !data.get(row).get(family).containsKey(qualifier); - else - return data.containsKey(row) && data.get(row).containsKey(family) - && data.get(row).get(family).containsKey(qualifier) - && !data.get(row).get(family).get(qualifier).isEmpty() - && Arrays.equals(data.get(row).get(family).get(qualifier).lastEntry().getValue(), value); - } - - /** - * {@inheritDoc} - */ - @Override - public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException { - if (check(row, family, qualifier, value)) { - put(put); - return true; - } - return false; - } - - @Override - public boolean checkAndPut(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, - byte[] bytes3, Put put) throws IOException { - return false; - } - - /** - * {@inheritDoc} - */ - @Override - public void delete(Delete delete) throws IOException { - byte[] row = delete.getRow(); - if (data.get(row) == null) - return; - if (delete.getFamilyCellMap().size() == 0) { - data.remove(row); - return; - } - for (byte[] family : delete.getFamilyCellMap().keySet()) { - if (data.get(row).get(family) == null) - continue; - if (delete.getFamilyCellMap().get(family).isEmpty()) { - data.get(row).remove(family); - continue; - } - for (Cell kv : delete.getFamilyCellMap().get(family)) { - if (CellUtil.isDelete(kv)) { - data.get(row).get(kv.getFamilyArray()).clear(); - } else { - data.get(row).get(kv.getFamilyArray()).remove(kv.getQualifierArray()); - } - } - if (data.get(row).get(family).isEmpty()) { - data.get(row).remove(family); - } - } - if (data.get(row).isEmpty()) { - data.remove(row); - } - } - - /** - * {@inheritDoc} - */ - @Override - public void delete(List<Delete> deletes) throws IOException { - for (Delete delete : deletes) { - delete(delete); - } - } - - /** - * {@inheritDoc} - */ - @Override - public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) - throws IOException { - if (check(row, family, qualifier, value)) { - delete(delete); - return true; - } - return false; - } - - @Override - public boolean checkAndDelete(byte[] bytes, byte[] bytes1, byte[] bytes2, CompareFilter.CompareOp compareOp, - byte[] bytes3, Delete delete) throws IOException { - return false; - } - - /** - * {@inheritDoc} - */ - @Override - public Result increment(Increment increment) throws IOException { - throw new NotImplementedException(); - } - - /** - * {@inheritDoc} - */ - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException { - return incrementColumnValue(row, family, qualifier, amount, null); - } - - @Override - public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) - throws IOException { - return 0; - } - - /** - * {@inheritDoc} - */ - @Override - public void close() throws IOException { - } - - @Override - public CoprocessorRpcChannel coprocessorService(byte[] row) { - throw new NotImplementedException(); - - } - - @Override - public <T extends Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, - Batch.Call<T, R> callable) throws ServiceException, Throwable { - throw new NotImplementedException(); - - } - - @Override - public <T extends Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, - Batch.Call<T, R> callable, Batch.Callback<R> callback) throws ServiceException, Throwable { - throw new NotImplementedException(); - - } - - /** - * {@inheritDoc} - */ - @Override - public long getWriteBufferSize() { - throw new NotImplementedException(); - } - - /** - * {@inheritDoc} - */ - @Override - public void setWriteBufferSize(long writeBufferSize) throws IOException { - throw new NotImplementedException(); - - } - - @Override - public <R extends Message> Map<byte[], R> batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, - Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws ServiceException, Throwable { - throw new NotImplementedException(); - - } - - @Override - public <R extends Message> void batchCoprocessorService(Descriptors.MethodDescriptor methodDescriptor, - Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) - throws ServiceException, Throwable { - throw new NotImplementedException(); - - } - - //@Override (only since 0.98.8) - public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareFilter.CompareOp compareOp, - byte[] value, RowMutations mutation) throws IOException { - throw new NotImplementedException(); - - } - - /*** - * - * All values are default - * - * **/ - @Override - public void setOperationTimeout(int i) { - - } - - @Override - public int getOperationTimeout() { - return 0; - } - - @Override - public int getRpcTimeout() { - return 0; - } - - @Override - public void setRpcTimeout(int i) { - - } - - @Override - public int getReadRpcTimeout() { - return 0; - } - - @Override - public void setReadRpcTimeout(int i) { - - } - - @Override - public int getWriteRpcTimeout() { - return 0; - } - - @Override - public void setWriteRpcTimeout(int i) { - - } -} \ No newline at end of file diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java index ced2934..74ac57b 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java @@ -18,20 +18,13 @@ package org.apache.kylin.storage.hbase.cube.v2; -import java.io.IOException; -import java.nio.BufferOverflowException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Locale; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.atomic.AtomicReference; -import java.util.zip.DataFormatException; - +import com.google.common.collect.Lists; +import com.google.protobuf.ByteString; +import com.google.protobuf.HBaseZeroCopyByteString; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.coprocessor.Batch; -import org.apache.hadoop.hbase.ipc.BlockingRpcCallback; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.kylin.common.KylinConfig; @@ -62,9 +55,14 @@ import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.Cub import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.protobuf.ByteString; -import com.google.protobuf.HBaseZeroCopyByteString; +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.zip.DataFormatException; public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @@ -180,7 +178,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC { @Override public void run() { - final String logHeader = String.format("<sub-thread for Query %s GTScanRequest %s>", queryId, Integer.toHexString(System.identityHashCode(scanRequest))); + final String logHeader = String.format(Locale.ROOT, "<sub-thread for Query %s GTScanRequest %s>", queryId, Integer.toHexString(System.identityHashCode(scanRequest))); final AtomicReference<RuntimeException> regionErrorHolder = new AtomicReference<>(); try { diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java index 62a62a5..2565897 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CubeHFileJob.java @@ -100,10 +100,10 @@ public class CubeHFileJob extends AbstractHadoopJob { Configuration hbaseConf = HBaseConfiguration.create(getConf()); - String hTableName = getOptionValue(OPTION_HTABLE_NAME).toUpperCase(); + String htable = getOptionValue(OPTION_HTABLE_NAME); connection = ConnectionFactory.createConnection(hbaseConf); - Table table = connection.getTable(TableName.valueOf(hTableName)); - RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(hTableName)); + Table table = connection.getTable(TableName.valueOf(htable)); + RegionLocator regionLocator = connection.getRegionLocator(TableName.valueOf(htable)); // Automatic config ! HFileOutputFormat2.configureIncrementalLoad(job, table, regionLocator); reconfigurePartitions(hbaseConf, partitionFilePath); diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java deleted file mode 100644 index 1f75660..0000000 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HFileOutputFormat3.java +++ /dev/null @@ -1,673 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ -package org.apache.kylin.storage.hbase.steps; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URLDecoder; -import java.net.URLEncoder; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.classification.InterfaceStability; -import org.apache.hadoop.hbase.client.HTable; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.io.compress.Compression; -import org.apache.hadoop.hbase.io.compress.Compression.Algorithm; -import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; -import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFileContext; -import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; -import org.apache.hadoop.hbase.mapreduce.KeyValueSerialization; -import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; -import org.apache.hadoop.hbase.mapreduce.MutationSerialization; -import org.apache.hadoop.hbase.mapreduce.PutSortReducer; -import org.apache.hadoop.hbase.mapreduce.ResultSerialization; -import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; -import org.apache.hadoop.hbase.mapreduce.TextSortReducer; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.StoreFile; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; -import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner; -import org.apache.kylin.common.util.RandomUtil; - -import com.google.common.annotations.VisibleForTesting; - -/** - * Copied from HBase's org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2, with fix attempt on KYLIN-2788 - * - * Writes HFiles. Passed Cells must arrive in order. - * Writes current time as the sequence id for the file. Sets the major compacted - * attribute on created @{link {@link HFile}s. Calling write(null,null) will forcibly roll - * all HFiles being written. - * <p> - * Using this class as part of a MapReduce job is best done - * using {@link #configureIncrementalLoad(Job, Table, RegionLocator)}. - */ -@InterfaceAudience.Public -@InterfaceStability.Evolving -public class HFileOutputFormat3 extends FileOutputFormat<ImmutableBytesWritable, Cell> { - static Log LOG = LogFactory.getLog(HFileOutputFormat3.class); - - // The following constants are private since these are used by - // HFileOutputFormat2 to internally transfer data between job setup and - // reducer run using conf. - // These should not be changed by the client. - private static final String COMPRESSION_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.compression"; - private static final String BLOOM_TYPE_FAMILIES_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; - private static final String BLOCK_SIZE_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.blocksize"; - private static final String DATABLOCK_ENCODING_FAMILIES_CONF_KEY = "hbase.mapreduce.hfileoutputformat.families.datablock.encoding"; - - // This constant is public since the client can modify this when setting - // up their conf object and thus refer to this symbol. - // It is present for backwards compatibility reasons. Use it only to - // override the auto-detection of datablock encoding. - public static final String DATABLOCK_ENCODING_OVERRIDE_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; - - @Override - public RecordWriter<ImmutableBytesWritable, Cell> getRecordWriter(final TaskAttemptContext context) - throws IOException, InterruptedException { - return createRecordWriter(context, this.getOutputCommitter(context)); - } - - static <V extends Cell> RecordWriter<ImmutableBytesWritable, V> createRecordWriter(final TaskAttemptContext context, - final OutputCommitter committer) throws IOException, InterruptedException { - - // Get the path of the temporary output file - final Path outputdir = ((FileOutputCommitter) committer).getWorkPath(); - final Configuration conf = context.getConfiguration(); - LOG.debug("Task output path: " + outputdir); - final FileSystem fs = outputdir.getFileSystem(conf); - // These configs. are from hbase-*.xml - final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE); - // Invented config. Add to hbase-*.xml if other than default compression. - final String defaultCompressionStr = conf.get("hfile.compression", Compression.Algorithm.NONE.getName()); - final Algorithm defaultCompression = AbstractHFileWriter.compressionByName(defaultCompressionStr); - final boolean compactionExclude = conf.getBoolean("hbase.mapreduce.hfileoutputformat.compaction.exclude", - false); - - // create a map from column family to the compression algorithm - final Map<byte[], Algorithm> compressionMap = createFamilyCompressionMap(conf); - final Map<byte[], BloomType> bloomTypeMap = createFamilyBloomTypeMap(conf); - final Map<byte[], Integer> blockSizeMap = createFamilyBlockSizeMap(conf); - - String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_OVERRIDE_CONF_KEY); - final Map<byte[], DataBlockEncoding> datablockEncodingMap = createFamilyDataBlockEncodingMap(conf); - final DataBlockEncoding overriddenEncoding; - if (dataBlockEncodingStr != null) { - overriddenEncoding = DataBlockEncoding.valueOf(dataBlockEncodingStr); - } else { - overriddenEncoding = null; - } - - return new RecordWriter<ImmutableBytesWritable, V>() { - // Map of families to writers and how much has been output on the writer. - private final Map<byte[], WriterLength> writers = new TreeMap<byte[], WriterLength>(Bytes.BYTES_COMPARATOR); - private byte[] previousRow = HConstants.EMPTY_BYTE_ARRAY; - private final byte[] now = Bytes.toBytes(System.currentTimeMillis()); - private boolean rollRequested = false; - - @Override - public void write(ImmutableBytesWritable row, V cell) throws IOException { - KeyValue kv = KeyValueUtil.ensureKeyValue(cell); - if (row == null && kv == null) { - rollWriters(); - return; - } - byte[] rowKey = CellUtil.cloneRow(kv); - long length = kv.getLength(); - byte[] family = CellUtil.cloneFamily(kv); - WriterLength wl = this.writers.get(family); - if (wl == null) { - fs.mkdirs(new Path(outputdir, Bytes.toString(family))); - } - if (wl != null && wl.written + length >= maxsize) { - this.rollRequested = true; - } - if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { - rollWriters(); - } - if (wl == null || wl.writer == null) { - wl = getNewWriter(family, conf); - } - kv.updateLatestStamp(this.now); - wl.writer.append(kv); - wl.written += length; - this.previousRow = rowKey; - } - - private void rollWriters() throws IOException { - for (WriterLength wl : this.writers.values()) { - if (wl.writer != null) { - LOG.info("Writer=" + wl.writer.getPath() + ((wl.written == 0) ? "" : ", wrote=" + wl.written)); - close(wl.writer); - } - wl.writer = null; - wl.written = 0; - } - this.rollRequested = false; - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "BX_UNBOXING_IMMEDIATELY_REBOXED", justification = "Not important") - private WriterLength getNewWriter(byte[] family, Configuration conf) throws IOException { - WriterLength wl = new WriterLength(); - Path familydir = new Path(outputdir, Bytes.toString(family)); - Algorithm compression = compressionMap.get(family); - compression = compression == null ? defaultCompression : compression; - BloomType bloomType = bloomTypeMap.get(family); - bloomType = bloomType == null ? BloomType.NONE : bloomType; - Integer blockSize = blockSizeMap.get(family); - blockSize = blockSize == null ? HConstants.DEFAULT_BLOCKSIZE : blockSize; - DataBlockEncoding encoding = overriddenEncoding; - encoding = encoding == null ? datablockEncodingMap.get(family) : encoding; - encoding = encoding == null ? DataBlockEncoding.NONE : encoding; - Configuration tempConf = new Configuration(conf); - tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); - HFileContextBuilder contextBuilder = new HFileContextBuilder().withCompression(compression) - .withChecksumType(HStore.getChecksumType(conf)) - .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blockSize); - contextBuilder.withDataBlockEncoding(encoding); - HFileContext hFileContext = contextBuilder.build(); - - wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs).withOutputDir(familydir) - .withBloomType(bloomType).withComparator(KeyValue.COMPARATOR).withFileContext(hFileContext) - .build(); - - this.writers.put(family, wl); - return wl; - } - - private void close(final StoreFile.Writer w) throws IOException { - if (w != null) { - w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis())); - w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, Bytes.toBytes(context.getTaskAttemptID().toString())); - w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, Bytes.toBytes(true)); - w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, Bytes.toBytes(compactionExclude)); - w.appendTrackedTimestampsToMetadata(); - w.close(); - } - } - - @Override - public void close(TaskAttemptContext c) throws IOException, InterruptedException { - for (WriterLength wl : this.writers.values()) { - close(wl.writer); - } - } - }; - } - - /* - * Data structure to hold a Writer and amount of data written on it. - */ - static class WriterLength { - long written = 0; - StoreFile.Writer writer = null; - } - - /** - * Return the start keys of all of the regions in this table, - * as a list of ImmutableBytesWritable. - */ - private static List<ImmutableBytesWritable> getRegionStartKeys(RegionLocator table) throws IOException { - byte[][] byteKeys = table.getStartKeys(); - ArrayList<ImmutableBytesWritable> ret = new ArrayList<ImmutableBytesWritable>(byteKeys.length); - for (byte[] byteKey : byteKeys) { - ret.add(new ImmutableBytesWritable(byteKey)); - } - return ret; - } - - /** - * Write out a {@link SequenceFile} that can be read by - * {@link TotalOrderPartitioner} that contains the split points in startKeys. - */ - @SuppressWarnings("deprecation") - private static void writePartitions(Configuration conf, Path partitionsPath, List<ImmutableBytesWritable> startKeys) - throws IOException { - LOG.info("Writing partition information to " + partitionsPath); - if (startKeys.isEmpty()) { - throw new IllegalArgumentException("No regions passed"); - } - - // We're generating a list of split points, and we don't ever - // have keys < the first region (which has an empty start key) - // so we need to remove it. Otherwise we would end up with an - // empty reducer with index 0 - TreeSet<ImmutableBytesWritable> sorted = new TreeSet<ImmutableBytesWritable>(startKeys); - - ImmutableBytesWritable first = sorted.first(); - if (!first.equals(HConstants.EMPTY_BYTE_ARRAY)) { - throw new IllegalArgumentException("First region of table should have empty start key. Instead has: " - + Bytes.toStringBinary(first.get())); - } - sorted.remove(first); - - // Write the actual file - FileSystem fs = partitionsPath.getFileSystem(conf); - SequenceFile.Writer writer = SequenceFile.createWriter(fs, conf, partitionsPath, ImmutableBytesWritable.class, - NullWritable.class); - - try { - for (ImmutableBytesWritable startKey : sorted) { - writer.append(startKey, NullWritable.get()); - } - } finally { - writer.close(); - } - } - - /** - * Configure a MapReduce Job to perform an incremental load into the given - * table. This - * <ul> - * <li>Inspects the table to configure a total order partitioner</li> - * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> - * <li>Sets the number of reduce tasks to match the current number of regions</li> - * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> - * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or - * PutSortReducer)</li> - * </ul> - * The user should be sure to set the map output value class to either KeyValue or Put before - * running this function. - * - * @deprecated Use {@link #configureIncrementalLoad(Job, Table, RegionLocator)} instead. - */ - @Deprecated - public static void configureIncrementalLoad(Job job, HTable table) throws IOException { - configureIncrementalLoad(job, table.getTableDescriptor(), table.getRegionLocator()); - } - - /** - * Configure a MapReduce Job to perform an incremental load into the given - * table. This - * <ul> - * <li>Inspects the table to configure a total order partitioner</li> - * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> - * <li>Sets the number of reduce tasks to match the current number of regions</li> - * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> - * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or - * PutSortReducer)</li> - * </ul> - * The user should be sure to set the map output value class to either KeyValue or Put before - * running this function. - */ - public static void configureIncrementalLoad(Job job, Table table, RegionLocator regionLocator) throws IOException { - configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator); - } - - /** - * Configure a MapReduce Job to perform an incremental load into the given - * table. This - * <ul> - * <li>Inspects the table to configure a total order partitioner</li> - * <li>Uploads the partitions file to the cluster and adds it to the DistributedCache</li> - * <li>Sets the number of reduce tasks to match the current number of regions</li> - * <li>Sets the output key/value class to match HFileOutputFormat2's requirements</li> - * <li>Sets the reducer up to perform the appropriate sorting (either KeyValueSortReducer or - * PutSortReducer)</li> - * </ul> - * The user should be sure to set the map output value class to either KeyValue or Put before - * running this function. - */ - public static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator) - throws IOException { - configureIncrementalLoad(job, tableDescriptor, regionLocator, HFileOutputFormat3.class); - } - - static void configureIncrementalLoad(Job job, HTableDescriptor tableDescriptor, RegionLocator regionLocator, - Class<? extends OutputFormat<?, ?>> cls) throws IOException, UnsupportedEncodingException { - Configuration conf = job.getConfiguration(); - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); - job.setOutputFormatClass(cls); - - // Based on the configured map output class, set the correct reducer to properly - // sort the incoming values. - // TODO it would be nice to pick one or the other of these formats. - if (KeyValue.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(KeyValueSortReducer.class); - } else if (Put.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(PutSortReducer.class); - } else if (Text.class.equals(job.getMapOutputValueClass())) { - job.setReducerClass(TextSortReducer.class); - } else { - LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); - } - - conf.setStrings("io.serializations", conf.get("io.serializations"), MutationSerialization.class.getName(), - ResultSerialization.class.getName(), KeyValueSerialization.class.getName()); - - // Use table's region boundaries for TOP split points. - LOG.info("Looking up current regions for table " + tableDescriptor.getTableName()); - List<ImmutableBytesWritable> startKeys = getRegionStartKeys(regionLocator); - LOG.info("Configuring " + startKeys.size() + " reduce partitions " + "to match current region count"); - job.setNumReduceTasks(startKeys.size()); - - configurePartitioner(job, startKeys); - // Set compression algorithms based on column families - configureCompression(conf, tableDescriptor); - configureBloomType(tableDescriptor, conf); - configureBlockSize(tableDescriptor, conf); - configureDataBlockEncoding(tableDescriptor, conf); - - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + regionLocator.getName() + " output configured."); - } - - public static void configureIncrementalLoadMap(Job job, Table table) throws IOException { - Configuration conf = job.getConfiguration(); - - job.setOutputKeyClass(ImmutableBytesWritable.class); - job.setOutputValueClass(KeyValue.class); - job.setOutputFormatClass(HFileOutputFormat3.class); - - // Set compression algorithms based on column families - configureCompression(conf, table.getTableDescriptor()); - configureBloomType(table.getTableDescriptor(), conf); - configureBlockSize(table.getTableDescriptor(), conf); - HTableDescriptor tableDescriptor = table.getTableDescriptor(); - configureDataBlockEncoding(tableDescriptor, conf); - - TableMapReduceUtil.addDependencyJars(job); - TableMapReduceUtil.initCredentials(job); - LOG.info("Incremental table " + table.getName() + " output configured."); - } - - /** - * Runs inside the task to deserialize column family to compression algorithm - * map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to the configured compression algorithm - */ - @VisibleForTesting - static Map<byte[], Algorithm> createFamilyCompressionMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, COMPRESSION_FAMILIES_CONF_KEY); - Map<byte[], Algorithm> compressionMap = new TreeMap<byte[], Algorithm>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - Algorithm algorithm = AbstractHFileWriter.compressionByName(e.getValue()); - compressionMap.put(e.getKey(), algorithm); - } - return compressionMap; - } - - /** - * Runs inside the task to deserialize column family to bloom filter type - * map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to the the configured bloom filter type - */ - @VisibleForTesting - static Map<byte[], BloomType> createFamilyBloomTypeMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOOM_TYPE_FAMILIES_CONF_KEY); - Map<byte[], BloomType> bloomTypeMap = new TreeMap<byte[], BloomType>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - BloomType bloomType = BloomType.valueOf(e.getValue()); - bloomTypeMap.put(e.getKey(), bloomType); - } - return bloomTypeMap; - } - - /** - * Runs inside the task to deserialize column family to block size - * map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to the configured block size - */ - @VisibleForTesting - static Map<byte[], Integer> createFamilyBlockSizeMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, BLOCK_SIZE_FAMILIES_CONF_KEY); - Map<byte[], Integer> blockSizeMap = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - Integer blockSize = Integer.parseInt(e.getValue()); - blockSizeMap.put(e.getKey(), blockSize); - } - return blockSizeMap; - } - - /** - * Runs inside the task to deserialize column family to data block encoding - * type map from the configuration. - * - * @param conf to read the serialized values from - * @return a map from column family to HFileDataBlockEncoder for the - * configured data block type for the family - */ - @VisibleForTesting - static Map<byte[], DataBlockEncoding> createFamilyDataBlockEncodingMap(Configuration conf) { - Map<byte[], String> stringMap = createFamilyConfValueMap(conf, DATABLOCK_ENCODING_FAMILIES_CONF_KEY); - Map<byte[], DataBlockEncoding> encoderMap = new TreeMap<byte[], DataBlockEncoding>(Bytes.BYTES_COMPARATOR); - for (Map.Entry<byte[], String> e : stringMap.entrySet()) { - encoderMap.put(e.getKey(), DataBlockEncoding.valueOf((e.getValue()))); - } - return encoderMap; - } - - /** - * Run inside the task to deserialize column family to given conf value map. - * - * @param conf to read the serialized values from - * @param confName conf key to read from the configuration - * @return a map of column family to the given configuration value - */ - private static Map<byte[], String> createFamilyConfValueMap(Configuration conf, String confName) { - Map<byte[], String> confValMap = new TreeMap<byte[], String>(Bytes.BYTES_COMPARATOR); - String confVal = conf.get(confName, ""); - for (String familyConf : confVal.split("&")) { - String[] familySplit = familyConf.split("="); - if (familySplit.length != 2) { - continue; - } - try { - confValMap.put(URLDecoder.decode(familySplit[0], "UTF-8").getBytes(StandardCharsets.UTF_8), - URLDecoder.decode(familySplit[1], "UTF-8")); - } catch (UnsupportedEncodingException e) { - // will not happen with UTF-8 encoding - throw new AssertionError(e); - } - } - return confValMap; - } - - /** - * Configure <code>job</code> with a TotalOrderPartitioner, partitioning against - * <code>splitPoints</code>. Cleans up the partitions file after job exists. - */ - static void configurePartitioner(Job job, List<ImmutableBytesWritable> splitPoints) throws IOException { - Configuration conf = job.getConfiguration(); - // create the partitions file - FileSystem fs = FileSystem.get(conf); - Path partitionsPath = new Path(conf.get("hbase.fs.tmp.dir"), "partitions_" + RandomUtil.randomUUID()); - fs.makeQualified(partitionsPath); - writePartitions(conf, partitionsPath, splitPoints); - fs.deleteOnExit(partitionsPath); - - // configure job to use it - job.setPartitionerClass(TotalOrderPartitioner.class); - TotalOrderPartitioner.setPartitionFile(conf, partitionsPath); - } - - /** - * Serialize column family to compression algorithm map to configuration. - * Invoked while configuring the MR job for incremental load. - * - * @param table to read the properties from - * @param conf to persist serialized values into - * @throws IOException - * on failure to read column family descriptors - */ - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE") - @VisibleForTesting - static void configureCompression(Configuration conf, HTableDescriptor tableDescriptor) - throws UnsupportedEncodingException { - StringBuilder compressionConfigValue = new StringBuilder(); - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - compressionConfigValue.append('&'); - } - compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - compressionConfigValue.append('='); - compressionConfigValue.append(URLEncoder.encode(familyDescriptor.getCompression().getName(), "UTF-8")); - } - // Get rid of the last ampersand - conf.set(COMPRESSION_FAMILIES_CONF_KEY, compressionConfigValue.toString()); - } - - /** - * Serialize column family to block size map to configuration. - * Invoked while configuring the MR job for incremental load. - * @param tableDescriptor to read the properties from - * @param conf to persist serialized values into - * - * @throws IOException - * on failure to read column family descriptors - */ - @VisibleForTesting - static void configureBlockSize(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { - StringBuilder blockSizeConfigValue = new StringBuilder(); - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - blockSizeConfigValue.append('&'); - } - blockSizeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - blockSizeConfigValue.append('='); - blockSizeConfigValue.append(URLEncoder.encode(String.valueOf(familyDescriptor.getBlocksize()), "UTF-8")); - } - // Get rid of the last ampersand - conf.set(BLOCK_SIZE_FAMILIES_CONF_KEY, blockSizeConfigValue.toString()); - } - - /** - * Serialize column family to bloom type map to configuration. - * Invoked while configuring the MR job for incremental load. - * @param tableDescriptor to read the properties from - * @param conf to persist serialized values into - * - * @throws IOException - * on failure to read column family descriptors - */ - @VisibleForTesting - static void configureBloomType(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - StringBuilder bloomTypeConfigValue = new StringBuilder(); - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - bloomTypeConfigValue.append('&'); - } - bloomTypeConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - bloomTypeConfigValue.append('='); - String bloomType = familyDescriptor.getBloomFilterType().toString(); - if (bloomType == null) { - bloomType = HColumnDescriptor.DEFAULT_BLOOMFILTER; - } - bloomTypeConfigValue.append(URLEncoder.encode(bloomType, "UTF-8")); - } - conf.set(BLOOM_TYPE_FAMILIES_CONF_KEY, bloomTypeConfigValue.toString()); - } - - /** - * Serialize column family to data block encoding map to configuration. - * Invoked while configuring the MR job for incremental load. - * - * @param table to read the properties from - * @param conf to persist serialized values into - * @throws IOException - * on failure to read column family descriptors - */ - @VisibleForTesting - static void configureDataBlockEncoding(HTableDescriptor tableDescriptor, Configuration conf) - throws UnsupportedEncodingException { - if (tableDescriptor == null) { - // could happen with mock table instance - return; - } - StringBuilder dataBlockEncodingConfigValue = new StringBuilder(); - Collection<HColumnDescriptor> families = tableDescriptor.getFamilies(); - int i = 0; - for (HColumnDescriptor familyDescriptor : families) { - if (i++ > 0) { - dataBlockEncodingConfigValue.append('&'); - } - dataBlockEncodingConfigValue.append(URLEncoder.encode(familyDescriptor.getNameAsString(), "UTF-8")); - dataBlockEncodingConfigValue.append('='); - DataBlockEncoding encoding = familyDescriptor.getDataBlockEncoding(); - if (encoding == null) { - encoding = DataBlockEncoding.NONE; - } - dataBlockEncodingConfigValue.append(URLEncoder.encode(encoding.toString(), "UTF-8")); - } - conf.set(DATABLOCK_ENCODING_FAMILIES_CONF_KEY, dataBlockEncodingConfigValue.toString()); - } -} diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java index c91428e..7b15eae 100644 --- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java +++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/steps/CubeHFileMapperTest.java @@ -18,12 +18,6 @@ package org.apache.kylin.storage.hbase.steps; -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.List; - import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.io.Text; @@ -34,6 +28,12 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; + +import static org.junit.Assert.assertEquals; + /** * @author George Song (ysong1) * @@ -69,14 +69,14 @@ public class CubeHFileMapperTest { Pair<RowKeyWritable, KeyValue> p2 = result.get(1); assertEquals(key, p1.getFirst()); - assertEquals("cf1", new String(p1.getSecond().getFamily(), StandardCharsets.UTF_8)); - assertEquals("usd_amt", new String(p1.getSecond().getQualifier(), StandardCharsets.UTF_8)); - assertEquals("35.43", new String(p1.getSecond().getValue(), StandardCharsets.UTF_8)); + assertEquals("cf1", new String(copy(p1.getSecond()), StandardCharsets.UTF_8)); + assertEquals("usd_amt", new String(copy(p1.getSecond()), StandardCharsets.UTF_8)); + assertEquals("35.43", new String(copy(p1.getSecond()), StandardCharsets.UTF_8)); assertEquals(key, p2.getFirst()); - assertEquals("cf1", new String(p2.getSecond().getFamily(), StandardCharsets.UTF_8)); - assertEquals("item_count", new String(p2.getSecond().getQualifier(), StandardCharsets.UTF_8)); - assertEquals("2", new String(p2.getSecond().getValue(), StandardCharsets.UTF_8)); + assertEquals("cf1", new String(copy(p2.getSecond()), StandardCharsets.UTF_8)); + assertEquals("item_count", new String(copy(p2.getSecond()), StandardCharsets.UTF_8)); + assertEquals("2", new String(copy(p2.getSecond()), StandardCharsets.UTF_8)); }