Seems like Calcite decided to use REAL for Float values in SQL transform, while 
Beam SQL (iinm) doesn’t have a conversion from Sql.REAL to any type of Beam 
schema field.

A workaround could be to add such conversion (REAL -> FLOAT) into 
CalciteUtils.java

—
Alexey

> On 12 Oct 2023, at 20:19, Balogh, György <[email protected]> wrote:
> 
> Hi,
> I'm using beam 2.51.0
> I'm trying to use UDF to transform float arrays and got the following error:
> 
> Exception in thread "main" java.lang.IllegalArgumentException: Cannot find a 
> matching Beam FieldType for Calcite type: REAL
> at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toFieldType(CalciteUtils.java:280)
> at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:253)
> at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:249)
> at 
> java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> at 
> java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720)
> at 
> java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> at 
> java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> at 
> java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> at 
> java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at 
> java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> at 
> org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:174)
> at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:182)
> at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:154)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496)
> at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:107)
> at 
> org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:56)
> at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:169)
> at 
> org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
> at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545)
> at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479)
> at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352)
> at com.ultinous.uquery.query.sandbox.Ops3.example6(Ops3.java:220)
> at com.ultinous.uquery.query.sandbox.Ops3.main(Ops3.java:243)
> 
> 
> This is my code:
> 
> public static class TestUDF implements SerializableFunction<List<Float>, 
> Float> {
>     @Override
>     public Float apply(List<Float> fv) {
>         float sum = 0;
>         if(fv != null)
>             for (Float a : fv)
>                 sum += a;
>         return sum;
>     }
> }
> 
> public static void example6() {
>     System.out.println("Example 6");
> 
>     Schema rowSchema = Schema.builder()
>             .addField("ind", Schema.FieldType.INT32)
>             .addField("fv", Schema.FieldType.array(Schema.FieldType.FLOAT))
>             .build();
> 
>     Pipeline p = createPipeline();
>     p
>             .apply(org.apache.beam.sdk.transforms.Create.of(1, 2, 3, 4, 5))
>             .apply(ParDo.of(new DoFn<Integer, Row>() {
>                 @ProcessElement
>                 public void processElement(@Element Integer ind, 
> OutputReceiver<Row> out) {
>                     Row.Builder rowBuilder = Row.withSchema(rowSchema);
>                     List<Float> fv = new ArrayList<Float>();
>                     fv.add(1f * ind);
>                     fv.add(2f * ind);
>                     Row row = rowBuilder
>                             .addValue(ind)
>                             .addValue(fv)
>                             .build();
>                     out.output(row);
>                 }
>             }))
>             .setRowSchema(rowSchema)
>             .apply(
>                     SqlTransform.query("select fv, testUDF(fv) from 
> PCOLLECTION")
>                             .registerUdf("testUDF", new TestUDF())
>             )
>             .apply(new Print());
>     p.run().waitUntilFinish();
> }
> 
> 
> -- 
> 
> György Balogh
> CTO
> E     [email protected] <mailto:[email protected]>
> M     +36 30 270 8342 <tel:+36%2030%20270%208342>
> A     HU, 1117 Budapest, Budafoki út 209.
> W     www.ultinous.com <http://www.ultinous.com/>

Reply via email to