Hi Noam,

Currently, Beam doesn't support conversion of HCatRecords to Rows (or) in
your case creating Beam Schema from Hive table schema, when the Hive table
have parameterized types.

We can use HCatFieldSchema[1] to create the Beam Schema from the Hive table
Schema.
I have created a JIRA ticket to track this issue:
https://issues.apache.org/jira/browse/BEAM-9840

[1]:
https://github.com/apache/hive/blob/f37c5de6c32b9395d1b34fa3c02ed06d1bfbf6eb/hcatalog/core/src/main/java/org/apache/hive/hcatalog/data/schema/HCatFieldSchema.java#L34

PS: I am working on supporting this feature. This feature should be
supported in the future releases of Apache Beam.

Regards,
Rahul

On Mon, Apr 27, 2020 at 6:57 PM Gershi, Noam <noam.ger...@citi.com> wrote:

> Hi
>
> Using HCatalogIO as a source - I am trying to read column tables.
>
>
>
> Code:
>
>
>
> PCollection<HCatRecord> hcatRecords = input
>
>                 .apply(HCatalogIO.read()
>
>                         .withConfigProperties(configProperties)
>
>                         .withDatabase(“db-name”)
>
>                         .withTable(“my-table-name”));
>
> ...
>
> HCatalogBeamSchema hcatSchema =
> HCatalogBeamSchema.create(ImmutableMap.of("table", "my-table-name"));
>
> Schema schema = hcatSchema.getTableSchema("db-name",
> "my-table-name”).get();
>
> List<Schema.Field> fields = schema.getFields();
>
>
>
>
>
> I get:
>
>
>
> 20/04/27 09:12:16 INFO LineBufferedStream: Caused by:
> java.lang.UnsupportedOperationException: The type 'decimal(30,16)' of field
> 'amount' is not supported.
>
> 20/04/27 09:12:16 INFO LineBufferedStream:      at
> org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamField(SchemaUtils.java:60)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:      at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:      at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:      at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:      at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:      at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:      at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:      at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:      at
> org.apache.beam.sdk.io.hcatalog.SchemaUtils.toBeamSchema(SchemaUtils.java:53)
>
> 20/04/27 09:12:16 INFO LineBufferedStream:      at
> org.apache.beam.sdk.io.hcatalog.HCatalogBeamSchema.getTableSchema(HCatalogBeamSchema.java:83)
>
>
>
> Thanx in advance,
>
> Noam
>
>
>

Reply via email to