This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new f3b0ac7 [BEAM-7056] Include partition keys in beam schema resolution new 020a820 Merge pull request #8276: [BEAM-7056] Include partition keys in beam schema resolution f3b0ac7 is described below commit f3b0ac7caddd5935a1e2bd92522287ea17cb6ad0 Author: Jozef Vilcek <jozo.vil...@gmail.com> AuthorDate: Thu Apr 11 11:44:07 2019 +0200 [BEAM-7056] Include partition keys in beam schema resolution --- .../beam/sdk/io/hcatalog/HCatalogBeamSchema.java | 7 ++++++- .../sdk/io/hcatalog/HCatalogBeamSchemaTest.java | 24 ++++++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java index 1b1705c..b43bc9e 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchema.java @@ -18,13 +18,16 @@ package org.apache.beam.sdk.io.hcatalog; import com.sun.istack.Nullable; +import java.util.List; import java.util.Map; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Optional; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; /** @@ -75,7 +78,9 @@ public class HCatalogBeamSchema { public Optional<Schema> getTableSchema(String db, String table) { try { org.apache.hadoop.hive.metastore.api.Table metastoreTable = metastore.getTable(db, table); - Schema schema = SchemaUtils.toBeamSchema(metastoreTable.getSd().getCols()); + List<FieldSchema> fields = Lists.newArrayList(metastoreTable.getSd().getCols()); + fields.addAll(metastoreTable.getPartitionKeys()); + Schema schema = SchemaUtils.toBeamSchema(fields); return Optional.of(schema); } catch (NoSuchObjectException e) { return Optional.absent(); diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java index f993740..8342fba 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogBeamSchemaTest.java @@ -38,6 +38,8 @@ import org.junit.rules.TemporaryFolder; /** Unit tests for {@link HCatalogBeamSchema}. */ public class HCatalogBeamSchemaTest implements Serializable { + + private static final String TEST_TABLE_PARTITIONED = TEST_TABLE + "_partitioned"; @ClassRule public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder(); private static EmbeddedMetastoreService service; @@ -87,6 +89,22 @@ public class HCatalogBeamSchemaTest implements Serializable { } @Test + public void testGetTableSchemaForPartitionedTable() throws Exception { + HCatalogBeamSchema hcatSchema = HCatalogBeamSchema.create(service.getHiveConfAsMap()); + Schema schema = hcatSchema.getTableSchema(TEST_DATABASE, TEST_TABLE_PARTITIONED).get(); + + Schema expectedSchema = + Schema.builder() + .addNullableField("mycol1", Schema.FieldType.STRING) + .addNullableField("mycol2", Schema.FieldType.INT32) + .addNullableField("part1", Schema.FieldType.STRING) + .addNullableField("part2", Schema.FieldType.INT32) + .build(); + + assertEquals(expectedSchema, schema); + } + + @Test public void testDoesntHaveTable() throws Exception { HCatalogBeamSchema hcatSchema = HCatalogBeamSchema.create(service.getHiveConfAsMap()); assertFalse(hcatSchema.getTableSchema(TEST_DATABASE, "non-existent-table").isPresent()); @@ -99,6 +117,12 @@ public class HCatalogBeamSchemaTest implements Serializable { private void reCreateTestTable() throws CommandNeedRetryException { service.executeQuery("drop table " + TEST_TABLE); + service.executeQuery("drop table " + TEST_TABLE_PARTITIONED); service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 int)"); + service.executeQuery( + "create table " + + TEST_TABLE_PARTITIONED + + "(mycol1 string, mycol2 int) " + + "partitioned by (part1 string, part2 int)"); } }