This is an automated email from the ASF dual-hosted git repository.

aromanenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f3b0ac7  [BEAM-7056] Include partition keys in beam schema resolution
     new 020a820  Merge pull request #8276: [BEAM-7056] Include partition keys 
in beam schema resolution
f3b0ac7 is described below

commit f3b0ac7caddd5935a1e2bd92522287ea17cb6ad0
Author: Jozef Vilcek <jozo.vil...@gmail.com>
AuthorDate: Thu Apr 11 11:44:07 2019 +0200

    [BEAM-7056] Include partition keys in beam schema resolution
---
 .../beam/sdk/io/hcatalog/HCatalogBeamSchema.java   |  7 ++++++-
 .../sdk/io/hcatalog/HCatalogBeamSchemaTest.java    | 24 ++++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)

diff --git 
a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java
 
b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java
index 1b1705c..b43bc9e 100644
--- 
a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java
+++ 
b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java
@@ -18,13 +18,16 @@
 package org.apache.beam.sdk.io.hcatalog;
 
 import com.sun.istack.Nullable;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 
 /**
@@ -75,7 +78,9 @@ public class HCatalogBeamSchema {
   public Optional<Schema> getTableSchema(String db, String table) {
     try {
       org.apache.hadoop.hive.metastore.api.Table metastoreTable = 
metastore.getTable(db, table);
-      Schema schema = 
SchemaUtils.toBeamSchema(metastoreTable.getSd().getCols());
+      List<FieldSchema> fields = 
Lists.newArrayList(metastoreTable.getSd().getCols());
+      fields.addAll(metastoreTable.getPartitionKeys());
+      Schema schema = SchemaUtils.toBeamSchema(fields);
       return Optional.of(schema);
     } catch (NoSuchObjectException e) {
       return Optional.absent();
diff --git 
a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java
 
b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java
index f993740..8342fba 100644
--- 
a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java
+++ 
b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java
@@ -38,6 +38,8 @@ import org.junit.rules.TemporaryFolder;
 
 /** Unit tests for {@link HCatalogBeamSchema}. */
 public class HCatalogBeamSchemaTest implements Serializable {
+
+  private static final String TEST_TABLE_PARTITIONED = TEST_TABLE + 
"_partitioned";
   @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
 
   private static EmbeddedMetastoreService service;
@@ -87,6 +89,22 @@ public class HCatalogBeamSchemaTest implements Serializable {
   }
 
   @Test
+  public void testGetTableSchemaForPartitionedTable() throws Exception {
+    HCatalogBeamSchema hcatSchema = 
HCatalogBeamSchema.create(service.getHiveConfAsMap());
+    Schema schema = hcatSchema.getTableSchema(TEST_DATABASE, 
TEST_TABLE_PARTITIONED).get();
+
+    Schema expectedSchema =
+        Schema.builder()
+            .addNullableField("mycol1", Schema.FieldType.STRING)
+            .addNullableField("mycol2", Schema.FieldType.INT32)
+            .addNullableField("part1", Schema.FieldType.STRING)
+            .addNullableField("part2", Schema.FieldType.INT32)
+            .build();
+
+    assertEquals(expectedSchema, schema);
+  }
+
+  @Test
   public void testDoesntHaveTable() throws Exception {
     HCatalogBeamSchema hcatSchema = 
HCatalogBeamSchema.create(service.getHiveConfAsMap());
     assertFalse(hcatSchema.getTableSchema(TEST_DATABASE, 
"non-existent-table").isPresent());
@@ -99,6 +117,12 @@ public class HCatalogBeamSchemaTest implements Serializable 
{
 
   private void reCreateTestTable() throws CommandNeedRetryException {
     service.executeQuery("drop table " + TEST_TABLE);
+    service.executeQuery("drop table " + TEST_TABLE_PARTITIONED);
     service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, 
mycol2 int)");
+    service.executeQuery(
+        "create table "
+            + TEST_TABLE_PARTITIONED
+            + "(mycol1 string, mycol2 int) "
+            + "partitioned by (part1 string, part2 int)");
   }
 }

Reply via email to