This is an automated email from the ASF dual-hosted git repository. iemejia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 9bbe6f5 [BEAM-7450] Support unbounded reads with HCatalogIO new 19804ac Merge pull request #8718: [BEAM-7450] Support unbounded reads with HCatalogIO 9bbe6f5 is described below commit 9bbe6f523aac427958a4e99e4d729a80b105e63d Author: Ankit Jhalaria <ajhala...@godaddy.com> AuthorDate: Wed May 29 12:33:01 2019 -0700 [BEAM-7450] Support unbounded reads with HCatalogIO --- .../apache/beam/sdk/io/hcatalog/HCatalogIO.java | 91 +++++++++++++++-- .../apache/beam/sdk/io/hcatalog/HCatalogUtils.java | 87 ++++++++++++++++ .../beam/sdk/io/hcatalog/PartitionPollerFn.java | 56 +++++++++++ .../beam/sdk/io/hcatalog/PartitionReaderFn.java | 111 +++++++++++++++++++++ .../beam/sdk/io/hcatalog/HCatalogIOTest.java | 80 +++++++++++++++ 5 files changed, 414 insertions(+), 11 deletions(-) diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java index 73518f6..05b43c6 100644 --- a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogIO.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import java.util.NoSuchElementException; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; @@ -33,15 +32,17 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.hadoop.WritableCoder; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Watch; +import org.apache.beam.sdk.transforms.Watch.Growth.TerminationCondition; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.metadata.Table; @@ -58,6 +59,7 @@ import org.apache.hive.hcatalog.data.transfer.ReadEntity; import org.apache.hive.hcatalog.data.transfer.ReaderContext; import org.apache.hive.hcatalog.data.transfer.WriteEntity; import org.apache.hive.hcatalog.data.transfer.WriterContext; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -83,6 +85,20 @@ import org.slf4j.LoggerFactory; * .withFilter(filterString) //optional, may be specified if the table is partitioned * }</pre> * + * <p>HCatalog source supports reading of HCatRecord in an unbounded mode. When run in an unbounded + * mode, HCatalogIO will continuously poll for new partitions and read that data. If provided with a + * termination condition, it will stop reading data after the condition is met. + * + * <pre>{@code + * pipeline + * .apply(HCatalogIO.read() + * .withConfigProperties(configProperties) + * .withDatabase("default") //optional, assumes default if none specified + * .withTable("employee") + * .withPollingInterval(Duration.millis(15000)) // poll for new partitions every 15 seconds + * .withTerminationCondition(Watch.Growth.afterTotalOf(Duration.millis(60000)))) //optional + * }</pre> + * * <h3>Writing using HCatalog</h3> * * <p>HCatalog sink supports writing of HCatRecord to a HCatalog managed source, for eg. Hive. @@ -120,7 +136,10 @@ public class HCatalogIO { /** Read data from Hive. */ public static Read read() { - return new AutoValue_HCatalogIO_Read.Builder().setDatabase(DEFAULT_DATABASE).build(); + return new AutoValue_HCatalogIO_Read.Builder() + .setDatabase(DEFAULT_DATABASE) + .setPartitionCols(new ArrayList<>()) + .build(); } private HCatalogIO() {} @@ -129,6 +148,7 @@ public class HCatalogIO { @VisibleForTesting @AutoValue public abstract static class Read extends PTransform<PBegin, PCollection<HCatRecord>> { + @Nullable abstract Map<String, String> getConfigProperties(); @@ -147,6 +167,15 @@ public class HCatalogIO { @Nullable abstract Integer getSplitId(); + @Nullable + abstract Duration getPollingInterval(); + + @Nullable + abstract List<String> getPartitionCols(); + + @Nullable + abstract TerminationCondition<Read, ?> getTerminationCondition(); + abstract Builder toBuilder(); @AutoValue.Builder @@ -163,6 +192,12 @@ public class HCatalogIO { abstract Builder setContext(ReaderContext context); + abstract Builder setPollingInterval(Duration pollingInterval); + + abstract Builder setPartitionCols(List<String> partitionCols); + + abstract Builder setTerminationCondition(TerminationCondition<Read, ?> terminationCondition); + abstract Read build(); } @@ -186,6 +221,28 @@ public class HCatalogIO { return toBuilder().setFilter(filter).build(); } + /** + * If specified, polling for new partitions will happen at this periodicity. The returned + * PCollection will be unbounded. However if a withTerminationCondition is set along with + * pollingInterval, polling will stop after the termination condition has been met. + */ + public Read withPollingInterval(Duration pollingInterval) { + return toBuilder().setPollingInterval(pollingInterval).build(); + } + + /** Set the names of the columns that are partitions. */ + public Read withPartitionCols(List<String> partitionCols) { + return toBuilder().setPartitionCols(partitionCols).build(); + } + + /** + * If specified, the poll function will stop polling after the termination condition has been + * satisfied. + */ + public Read withTerminationCondition(TerminationCondition<Read, ?> terminationCondition) { + return toBuilder().setTerminationCondition(terminationCondition).build(); + } + Read withSplitId(int splitId) { checkArgument(splitId >= 0, "Invalid split id-" + splitId); return toBuilder().setSplitId(splitId).build(); @@ -196,11 +253,27 @@ public class HCatalogIO { } @Override + @SuppressWarnings("deprecation") public PCollection<HCatRecord> expand(PBegin input) { checkArgument(getTable() != null, "withTable() is required"); checkArgument(getConfigProperties() != null, "withConfigProperties() is required"); - - return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this))); + Watch.Growth<Read, Integer, Integer> growthFn; + if (getPollingInterval() != null) { + growthFn = Watch.growthOf(new PartitionPollerFn()).withPollInterval(getPollingInterval()); + if (getTerminationCondition() != null) { + growthFn = growthFn.withTerminationPerInput(getTerminationCondition()); + } + return input + .apply("ConvertToReadRequest", Create.of(this)) + .apply("WatchForNewPartitions", growthFn) + .apply("PartitionReader", ParDo.of(new PartitionReaderFn(getConfigProperties()))); + } else { + // Treat as Bounded + checkArgument( + getTerminationCondition() == null, + "withTerminationCondition() is not required when using in bounded reads mode"); + return input.apply(org.apache.beam.sdk.io.Read.from(new BoundedHCatalogSource(this))); + } } @Override @@ -244,14 +317,10 @@ public class HCatalogIO { */ @Override public long getEstimatedSizeBytes(PipelineOptions pipelineOptions) throws Exception { - Configuration conf = new Configuration(); - for (Entry<String, String> entry : spec.getConfigProperties().entrySet()) { - conf.set(entry.getKey(), entry.getValue()); - } IMetaStoreClient client = null; try { - HiveConf hiveConf = HCatUtil.getHiveConf(conf); - client = HCatUtil.getHiveMetastoreClient(hiveConf); + HiveConf hiveConf = HCatalogUtils.createHiveConf(spec); + client = HCatalogUtils.createMetaStoreClient(hiveConf); Table table = HCatUtil.getTable(client, spec.getDatabase(), spec.getTable()); return StatsUtils.getFileSizeForTable(hiveConf, table); } finally { diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogUtils.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogUtils.java new file mode 100644 index 0000000..bf3638e --- /dev/null +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/HCatalogUtils.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hcatalog; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.io.hcatalog.HCatalogIO.Read; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.stats.StatsUtils; +import org.apache.hive.hcatalog.common.HCatUtil; + +/** Utility classes to enable meta store conf/client creation. */ +public class HCatalogUtils { + + private static final int DESIRED_BUNDLE_SIZE_BYTES = 134217728; // 128 MB + + static IMetaStoreClient createMetaStoreClient(Configuration conf) + throws IOException, MetaException { + final HiveConf hiveConf = HCatUtil.getHiveConf(conf); + return HCatUtil.getHiveMetastoreClient(hiveConf); + } + + static HiveConf createHiveConf(Read readRequest) throws IOException { + Configuration conf = createConfiguration(readRequest.getConfigProperties()); + return HCatUtil.getHiveConf(conf); + } + + static int getSplitCount(Read readRequest, Partition partitionToRead) throws Exception { + int desiredSplitCount = 1; + long estimatedSizeBytes = getFileSizeForPartition(readRequest, partitionToRead); + if (estimatedSizeBytes > 0) { + desiredSplitCount = (int) Math.ceil((double) estimatedSizeBytes / DESIRED_BUNDLE_SIZE_BYTES); + } + return desiredSplitCount; + } + + static Configuration createConfiguration(Map<String, String> configProperties) { + Configuration conf = new Configuration(); + for (Map.Entry<String, String> entry : configProperties.entrySet()) { + conf.set(entry.getKey(), entry.getValue()); + } + return conf; + } + + private static long getFileSizeForPartition(Read readRequest, Partition partitionToRead) + throws Exception { + IMetaStoreClient client = null; + try { + HiveConf hiveConf = HCatalogUtils.createHiveConf(readRequest); + client = HCatalogUtils.createMetaStoreClient(hiveConf); + List<org.apache.hadoop.hive.ql.metadata.Partition> p = new ArrayList<>(); + Table table = HCatUtil.getTable(client, readRequest.getDatabase(), readRequest.getTable()); + final org.apache.hadoop.hive.ql.metadata.Partition partition = + new org.apache.hadoop.hive.ql.metadata.Partition(table, partitionToRead); + p.add(partition); + final List<Long> fileSizeForPartitions = StatsUtils.getFileSizeForPartitions(hiveConf, p); + return fileSizeForPartitions.get(0); + } finally { + // IMetaStoreClient is not AutoCloseable, closing it manually + if (client != null) { + client.close(); + } + } + } +} diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionPollerFn.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionPollerFn.java new file mode 100644 index 0000000..2e40710 --- /dev/null +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionPollerFn.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hcatalog; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.sdk.io.hcatalog.HCatalogIO.Read; +import org.apache.beam.sdk.transforms.Watch.Growth.PollFn; +import org.apache.beam.sdk.transforms.Watch.Growth.PollResult; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.joda.time.Instant; + +/** Return the list of current partitions present. */ +class PartitionPollerFn extends PollFn<Read, Integer> { + private transient IMetaStoreClient metaStoreClient; + + @Override + public PollResult<Integer> apply(Read element, Context c) throws Exception { + final Configuration conf = HCatalogUtils.createConfiguration(element.getConfigProperties()); + metaStoreClient = HCatalogUtils.createMetaStoreClient(conf); + final Instant now = Instant.now(); + final PollResult<Integer> pollResult = + PollResult.incomplete(now, getPartitionIndices(element)).withWatermark(now); + if (metaStoreClient != null) { + metaStoreClient.close(); + } + return pollResult; + } + + private List<Integer> getPartitionIndices(Read read) throws Exception { + return IntStream.range( + 0, + metaStoreClient + .listPartitions(read.getDatabase(), read.getTable(), Short.MAX_VALUE) + .size()) + .boxed() + .collect(Collectors.toList()); + } +} diff --git a/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionReaderFn.java b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionReaderFn.java new file mode 100644 index 0000000..70d0529 --- /dev/null +++ b/sdks/java/io/hcatalog/src/main/java/org/apache/beam/sdk/io/hcatalog/PartitionReaderFn.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.hcatalog; + +import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.io.hcatalog.HCatalogIO.Read; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.data.HCatRecord; +import org.apache.hive.hcatalog.data.transfer.DataTransferFactory; +import org.apache.hive.hcatalog.data.transfer.HCatReader; +import org.apache.hive.hcatalog.data.transfer.ReadEntity; +import org.apache.hive.hcatalog.data.transfer.ReaderContext; + +/** Reads partition at a given index. */ +class PartitionReaderFn extends DoFn<KV<Read, Integer>, HCatRecord> { + private transient IMetaStoreClient metaStoreClient; + private Map<String, String> configProperties; + + public PartitionReaderFn(Map<String, String> configProperties) { + this.configProperties = configProperties; + } + + private ReaderContext getReaderContext(Read readRequest, Integer partitionIndexToRead) + throws Exception { + final List<Partition> partitions = + metaStoreClient.listPartitions( + readRequest.getDatabase(), readRequest.getTable(), Short.MAX_VALUE); + final Partition partition = partitions.get(partitionIndexToRead); + checkArgument( + partition != null, "Unable to find a partition to read at index " + partitionIndexToRead); + + final int desiredSplitCount = HCatalogUtils.getSplitCount(readRequest, partition); + final List<String> values = partition.getValues(); + final List<String> partitionCols = readRequest.getPartitionCols(); + checkArgument( + values.size() == partitionCols.size(), + "Number of input partitions should be equal to the values of list partition values."); + + List<String> filter = new ArrayList<>(); + for (int i = 0; i < partitionCols.size(); i++) { + filter.add(partitionCols.get(i) + "=" + "'" + values.get(i) + "'"); + } + final String filterString = String.join(" and ", filter); + + ReadEntity entity = + new ReadEntity.Builder() + .withDatabase(readRequest.getDatabase()) + .withFilter(filterString) + .withTable(readRequest.getTable()) + .build(); + // pass the 'desired' split count as an hint to the API + Map<String, String> configProps = new HashMap<>(readRequest.getConfigProperties()); + configProps.put( + HCatConstants.HCAT_DESIRED_PARTITION_NUM_SPLITS, String.valueOf(desiredSplitCount)); + return DataTransferFactory.getHCatReader(entity, configProps).prepareRead(); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + final Read readRequest = c.element().getKey(); + final Integer partitionIndexToRead = c.element().getValue(); + ReaderContext readerContext = getReaderContext(readRequest, partitionIndexToRead); + for (int i = 0; i < readerContext.numSplits(); i++) { + HCatReader reader = DataTransferFactory.getHCatReader(readerContext, i); + Iterator<HCatRecord> hcatIterator = reader.read(); + while (hcatIterator.hasNext()) { + final HCatRecord record = hcatIterator.next(); + c.output(record); + } + } + } + + @Setup + public void setup() throws Exception { + final Configuration conf = HCatalogUtils.createConfiguration(configProperties); + metaStoreClient = HCatalogUtils.createMetaStoreClient(conf); + } + + @Teardown + public void teardown() { + if (metaStoreClient != null) { + metaStoreClient.close(); + } + } +} diff --git a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java index da631a3..7f925ef 100644 --- a/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java +++ b/sdks/java/io/hcatalog/src/test/java/org/apache/beam/sdk/io/hcatalog/HCatalogIOTest.java @@ -40,8 +40,12 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.io.BoundedSource; +import org.apache.beam.sdk.io.hadoop.WritableCoder; import org.apache.beam.sdk.io.hcatalog.HCatalogIO.BoundedHCatalogSource; import org.apache.beam.sdk.io.hcatalog.test.EmbeddedMetastoreService; import org.apache.beam.sdk.options.PipelineOptions; @@ -52,11 +56,16 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Watch; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hive.hcatalog.data.DefaultHCatRecord; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.data.transfer.ReaderContext; +import org.joda.time.Duration; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -96,6 +105,9 @@ public class HCatalogIOTest implements Serializable { prepareTestData(); } else if (description.getAnnotation(NeedsEmptyTestTables.class) != null) { reCreateTestTable(); + } else if (description.getAnnotation(NeedsEmptyTestTablesForUnboundedReads.class) + != null) { + reCreateTestTableForUnboundedReads(); } base.evaluate(); } @@ -110,6 +122,11 @@ public class HCatalogIOTest implements Serializable { @Target({ElementType.METHOD}) private @interface NeedsTestData {} + /** Use this annotation to setup complete test data(table populated with unbounded records). */ + @Retention(RetentionPolicy.RUNTIME) + @Target({ElementType.METHOD}) + private @interface NeedsEmptyTestTablesForUnboundedReads {} + /** Use this annotation to setup test tables alone(empty tables, no records are populated). */ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD}) @@ -163,6 +180,56 @@ public class HCatalogIOTest implements Serializable { readAfterWritePipeline.run(); } + private Map<String, String> getPartitions() { + Map<String, String> partitions = new HashMap<>(); + partitions.put("load_date", "2019-05-14T23:28:04.425Z"); + partitions.put("product_type", "1"); + return partitions; + } + + /** Perform end-to-end test of Write-then-Read operation. */ + @Test + @NeedsEmptyTestTablesForUnboundedReads + public void testWriteThenUnboundedReadSuccess() throws Exception { + + defaultPipeline + .apply(Create.of(buildHCatRecords(TEST_RECORDS_COUNT))) + .apply( + HCatalogIO.write() + .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) + .withDatabase(TEST_DATABASE) + .withTable(TEST_TABLE) + .withPartition(getPartitions()) + .withBatchSize(512L)); + defaultPipeline.run(); + final ImmutableList<String> partitions = ImmutableList.of("load_date", "product_type"); + final PCollection<HCatRecord> data = + readAfterWritePipeline + .apply( + "ReadData", + HCatalogIO.read() + .withConfigProperties(getConfigPropertiesAsMap(service.getHiveConf())) + .withDatabase(TEST_DATABASE) + .withPartitionCols(partitions) + .withTable(TEST_TABLE) + .withPollingInterval(Duration.millis(15000)) + .withTerminationCondition(Watch.Growth.afterTotalOf(Duration.millis(60000)))) + .setCoder((Coder) WritableCoder.of(DefaultHCatRecord.class)); + + final PCollection<String> output = + data.apply( + ParDo.of( + new DoFn<HCatRecord, String>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().get(0).toString()); + } + })); + + PAssert.that(output).containsInAnyOrder(getExpectedRecords(TEST_RECORDS_COUNT)); + readAfterWritePipeline.run(); + } + /** Test of Write to a non-existent table. */ @Test public void testWriteFailureTableDoesNotExist() { @@ -276,6 +343,19 @@ public class HCatalogIOTest implements Serializable { service.executeQuery("create table " + TEST_TABLE + "(mycol1 string, mycol2 int)"); } + private void reCreateTestTableForUnboundedReads() throws CommandNeedRetryException { + service.executeQuery("drop table " + TEST_TABLE); + service.executeQuery( + "create table " + + TEST_TABLE + + "(mycol1 string, mycol2 int) " + + "partitioned by (load_date string, product_type string)"); + service.executeQuery( + "ALTER TABLE " + + TEST_TABLE + + " ADD PARTITION (load_date='2019-05-14T23:28:04.425Z', product_type='1')"); + } + private void prepareTestData() throws Exception { reCreateTestTable(); insertTestData(getConfigPropertiesAsMap(service.getHiveConf()));