qingfei1994 opened a new issue, #5079:
URL: https://github.com/apache/paimon/issues/5079

   ### Search before asking
   
   - [x] I searched in the [issues](https://github.com/apache/paimon/issues) 
and found nothing similar.
   
   
   ### Paimon version
   
   master : [7f01330]
   
   ### Compute Engine
   
   Flink
   
   ### Minimal reproduce step
   
   Basically executing a Test case in PreAggregationITCase.java
   If you run a ddl like this, since aggregation-function avg is not in the 
classpath, nothing happens and it run successfully.
   ```
    @Test
           public void testWrongOption() {
               // VALUES does not guarantee order, but order is important for 
list aggregations.
               // So we need to sort the input data.
               batchSql(
                       "CREATE TABLE IF NOT EXISTS T4 ("
                               + "j INT, k INT, "
                               + "a INT, "
                               + "b INT,"
                               + "PRIMARY KEY (j,k) NOT ENFORCED)"
                               + " WITH ('merge-engine'='aggregation', "
                               + "'fields.a.aggregate-function'='avg', "
                               + 
"'fields.b.aggregate-function'='last_non_null_value'"
                               + ");");
           }
   ```
   You won't get error until executing an insert like this.
   
   ```
     @Test
           public void testWrongOption() {
               // VALUES does not guarantee order, but order is important for 
list aggregations.
               // So we need to sort the input data.
               batchSql(
                       "CREATE TABLE IF NOT EXISTS T4 ("
                               + "j INT, k INT, "
                               + "a INT, "
                               + "b INT,"
                               + "PRIMARY KEY (j,k) NOT ENFORCED)"
                               + " WITH ('merge-engine'='aggregation', "
                               + "'fields.a.aggregate-function'='avg', "
                               + 
"'fields.b.aggregate-function'='last_non_null_value'"
                               + ");");
               batchSql("INSERT INTO T4 VALUES (1, 2, 1, 2)");
           }
   ```
   The error message is as below:
   
   ```
   Caused by: org.apache.paimon.factories.FactoryException: Could not find any 
factory for identifier 'avg' that implements 
'org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory' 
in the classpath.
   
   Available factory identifiers are:
   
   custom
   bool_and
   bool_or
   collect
   first_non_null_value
   first_not_null_value
   first_value
   hll_sketch
   last_non_null_value
   last_value
   listagg
   max
   merge_map
   min
   nested_update
   primary-key
   product
   rbm32
   rbm64
   sum
   theta_sketch
        at 
org.apache.paimon.factories.FactoryUtil.discoverFactory(FactoryUtil.java:66)
        at 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory.create(FieldAggregatorFactory.java:38)
        at 
org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction$Factory.create(AggregateMergeFunction.java:145)
        at 
org.apache.paimon.mergetree.compact.MergeFunctionFactory.create(MergeFunctionFactory.java:30)
        at 
org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:222)
        at 
org.apache.paimon.operation.KeyValueFileStoreWrite.createWriter(KeyValueFileStoreWrite.java:95)
        at 
org.apache.paimon.operation.AbstractFileStoreWrite.createWriterContainer(AbstractFileStoreWrite.java:445)
        at 
org.apache.paimon.operation.AbstractFileStoreWrite.lambda$getWriterWrapper$5(AbstractFileStoreWrite.java:407)
        at java.util.HashMap.computeIfAbsent(HashMap.java:1127)
        at 
org.apache.paimon.operation.AbstractFileStoreWrite.getWriterWrapper(AbstractFileStoreWrite.java:406)
        at 
org.apache.paimon.operation.AbstractFileStoreWrite.write(AbstractFileStoreWrite.java:157)
        at 
org.apache.paimon.table.sink.TableWriteImpl.writeAndReturn(TableWriteImpl.java:187)
        at 
org.apache.paimon.flink.sink.StoreSinkWriteImpl.write(StoreSinkWriteImpl.java:195)
        at 
org.apache.paimon.flink.sink.DynamicBucketRowWriteOperator.processElement(DynamicBucketRowWriteOperator.java:54)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:238)
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:157)
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:114)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:638)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:973)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:917)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:949)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
        at java.lang.Thread.run(Thread.java:748)
   ```
   
   
   ### What doesn't meet your expectations?
   
   I think it's suppose to validate the options and throw an exception when we 
executing a ddl with wrong options instead of throw an exception when we 
executing the insert.
   
   ### Anything else?
   
   _No response_
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to