Dawid Wysakowicz created FLINK-35519:
----------------------------------------

             Summary: Flink Job fails with SingleValueAggFunction received more 
than one element
                 Key: FLINK-35519
                 URL: https://issues.apache.org/jira/browse/FLINK-35519
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.19.0
            Reporter: Dawid Wysakowicz


When running a query:
{code}
select 
   (SELECT 
       t.id 
    FROM raw_pagerduty_users, UNNEST(teams) AS t(id, type, summary, self, 
html_url))
from raw_pagerduty_users;
{code}
it is translated to:

{code}
Sink(table=[default_catalog.default_database.sink], fields=[EXPR$0])
+- Calc(select=[$f0 AS EXPR$0])
   +- Join(joinType=[LeftOuterJoin], where=[true], select=[c, $f0], 
leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :- Exchange(distribution=[single])
      :  +- Calc(select=[c])
      :     +- TableSourceScan(table=[[default_catalog, default_database, 
raw_pagerduty_users, project=[c, teams], metadata=[]]], fields=[c, 
teams])(reuse_id=[1])
      +- Exchange(distribution=[single])
         +- GroupAggregate(select=[SINGLE_VALUE(id) AS $f0])
            +- Exchange(distribution=[single])
               +- Calc(select=[id])
                  +- Correlate(invocation=[$UNNEST_ROWS$1($cor0.teams)], 
correlate=[table($UNNEST_ROWS$1($cor0.teams))], 
select=[c,teams,id,type,summary,self,html_url], rowType=[RecordType(BIGINT c, 
RecordType:peek_no_expand(VARCHAR(2147483647) id, VARCHAR(2147483647) type, 
VARCHAR(2147483647) summary, VARCHAR(2147483647) self, VARCHAR(2147483647) 
html_url) ARRAY teams, VARCHAR(2147483647) id, VARCHAR(2147483647) type, 
VARCHAR(2147483647) summary, VARCHAR(2147483647) self, VARCHAR(2147483647) 
html_url)], joinType=[INNER])
                     +- Reused(reference_id=[1])
{code}

and it fails with:

{code}
java.lang.RuntimeException: SingleValueAggFunction received more than one 
element.
        at GroupAggsHandler$150.accumulate(Unknown Source)
        at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:151)
        at 
org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
        at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
        at 
org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
        at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
        at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
        at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:571)
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:900)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:849)
        at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
        at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
        at java.base/java.lang.Thread.run(Thread.java:829)
{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to