qingfei1994 opened a new issue, #5079: URL: https://github.com/apache/paimon/issues/5079
### Search before asking - [x] I searched in the [issues](https://github.com/apache/paimon/issues) and found nothing similar. ### Paimon version master : [7f01330] ### Compute Engine Flink ### Minimal reproduce step Basically executing a Test case in PreAggregationITCase.java If you run a ddl like this, since aggregation-function avg is not in the classpath, nothing happens and it run successfully. ``` @Test public void testWrongOption() { // VALUES does not guarantee order, but order is important for list aggregations. // So we need to sort the input data. batchSql( "CREATE TABLE IF NOT EXISTS T4 (" + "j INT, k INT, " + "a INT, " + "b INT," + "PRIMARY KEY (j,k) NOT ENFORCED)" + " WITH ('merge-engine'='aggregation', " + "'fields.a.aggregate-function'='avg', " + "'fields.b.aggregate-function'='last_non_null_value'" + ");"); } ``` You won't get error until executing an insert like this. ``` @Test public void testWrongOption() { // VALUES does not guarantee order, but order is important for list aggregations. // So we need to sort the input data. batchSql( "CREATE TABLE IF NOT EXISTS T4 (" + "j INT, k INT, " + "a INT, " + "b INT," + "PRIMARY KEY (j,k) NOT ENFORCED)" + " WITH ('merge-engine'='aggregation', " + "'fields.a.aggregate-function'='avg', " + "'fields.b.aggregate-function'='last_non_null_value'" + ");"); batchSql("INSERT INTO T4 VALUES (1, 2, 1, 2)"); } ``` The error message is as below: ``` Caused by: org.apache.paimon.factories.FactoryException: Could not find any factory for identifier 'avg' that implements 'org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory' in the classpath. Available factory identifiers are: custom bool_and bool_or collect first_non_null_value first_not_null_value first_value hll_sketch last_non_null_value last_value listagg max merge_map min nested_update primary-key product rbm32 rbm64 sum theta_sketch at org.apache.paimon.factories.FactoryUtil.discoverFactory(FactoryUtil.java:66) at org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory.create(FieldAggregatorFactory.java:38) at org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction$Factory.create(AggregateMergeFunction.java:145) at org.apache.paimon.mergetree.compact.MergeFunctionFactory.create(MergeFunctionFactory.java:30) at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:222) at org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:95) at org.apache.paimon.operation.AbstractFileStoreWrite.createWriterContainer(AbstractFileStoreWrite.java:445) at org.apache.paimon.operation.AbstractFileStoreWrite.lambda$getWriterWrapper$5(AbstractFileStoreWrite.java:407) at java.util.HashMap.computeIfAbsent(HashMap.java:1127) at org.apache.paimon.operation.AbstractFileStoreWrite.getWriterWrapper(AbstractFileStoreWrite.java:406) at org.apache.paimon.operation.AbstractFileStoreWrite.write(AbstractFileStoreWrite.java:157) at org.apache.paimon.table.sink.TableWriteImpl.writeAndReturn(TableWriteImpl.java:187) at org.apache.paimon.flink.sink.StoreSinkWriteImpl.write(StoreSinkWriteImpl.java:195) at org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator.processElement(DynamicBucketRowWriteOperator.java:54) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) at java.lang.Thread.run(Thread.java:748) ``` ### What doesn't meet your expectations? I think it's suppose to validate the options and throw an exception when we executing a ddl with wrong options instead of throw an exception when we executing the insert. ### Anything else? _No response_ ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
