Seems like Calcite decided to use REAL for Float values in SQL transform, while Beam SQL (iinm) doesn’t have a conversion from Sql.REAL to any type of Beam schema field.
A workaround could be to add such conversion (REAL -> FLOAT) into CalciteUtils.java — Alexey > On 12 Oct 2023, at 20:19, Balogh, György <[email protected]> wrote: > > Hi, > I'm using beam 2.51.0 > I'm trying to use UDF to transform float arrays and got the following error: > > Exception in thread "main" java.lang.IllegalArgumentException: Cannot find a > matching Beam FieldType for Calcite type: REAL > at > org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toFieldType(CalciteUtils.java:280) > at > org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:253) > at > org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toField(CalciteUtils.java:249) > at > java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) > at > java.base/java.util.AbstractList$RandomAccessSpliterator.forEachRemaining(AbstractList.java:720) > at > java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) > at > java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) > at > java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) > at > java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) > at > java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) > at > org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:174) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:182) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamCalcRel$Transform.expand(BeamCalcRel.java:154) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:496) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:107) > at > org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:56) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:169) > at > org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109) > at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:545) > at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:479) > at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:352) > at com.ultinous.uquery.query.sandbox.Ops3.example6(Ops3.java:220) > at com.ultinous.uquery.query.sandbox.Ops3.main(Ops3.java:243) > > > This is my code: > > public static class TestUDF implements SerializableFunction<List<Float>, > Float> { > @Override > public Float apply(List<Float> fv) { > float sum = 0; > if(fv != null) > for (Float a : fv) > sum += a; > return sum; > } > } > > public static void example6() { > System.out.println("Example 6"); > > Schema rowSchema = Schema.builder() > .addField("ind", Schema.FieldType.INT32) > .addField("fv", Schema.FieldType.array(Schema.FieldType.FLOAT)) > .build(); > > Pipeline p = createPipeline(); > p > .apply(org.apache.beam.sdk.transforms.Create.of(1, 2, 3, 4, 5)) > .apply(ParDo.of(new DoFn<Integer, Row>() { > @ProcessElement > public void processElement(@Element Integer ind, > OutputReceiver<Row> out) { > Row.Builder rowBuilder = Row.withSchema(rowSchema); > List<Float> fv = new ArrayList<Float>(); > fv.add(1f * ind); > fv.add(2f * ind); > Row row = rowBuilder > .addValue(ind) > .addValue(fv) > .build(); > out.output(row); > } > })) > .setRowSchema(rowSchema) > .apply( > SqlTransform.query("select fv, testUDF(fv) from > PCOLLECTION") > .registerUdf("testUDF", new TestUDF()) > ) > .apply(new Print()); > p.run().waitUntilFinish(); > } > > > -- > > György Balogh > CTO > E [email protected] <mailto:[email protected]> > M +36 30 270 8342 <tel:+36%2030%20270%208342> > A HU, 1117 Budapest, Budafoki út 209. > W www.ultinous.com <http://www.ultinous.com/>
