haifxu opened a new issue, #9205:
URL: https://github.com/apache/inlong/issues/9205
### What happened
```
[ ] 2023-11-02 08:35:50.457 -ERROR [inlong-plugin-0]
o.a.i.m.p.f.FlinkService :146 - submit job from info
FlinkInfo(endpoint=null, jobName=InLong-Sort-test_group_10,
inlongStreamInfoList=[InlongStreamInfo(id=13, inlongGroupId=test_group_10,
inlongStreamId=test_stream_10, name=null, description=,
mqResource=test_stream_10, dataType=null, dataEncoding=UTF-8,
dataSeparator=124, dataEscapeChar=null, syncSend=0, dailyRecords=10,
dailyStorage=10, peakRecords=1000, maxLength=10240, storagePeriod=1,
extParams={"ignoreParseError":true,"useExtendedFields":false}, status=130,
previousStatus=100, creator=admin, modifier=admin, createTime=Thu Nov 02
00:28:51 UTC 2023, modifyTime=Thu Nov 02 00:35:42 UTC 2023,
fieldList=[StreamField(id=58, inlongGroupId=test_group_10,
inlongStreamId=test_stream_10, fieldName=id, fieldType=int, fieldComment=null,
isPredefinedField=null, fieldValue=null, preExpression=null, isMetaField=0,
metaFieldName=null, fieldFormat=null, originNodeName=null, originFieldN
ame=null, extParams=null), StreamField(id=59, inlongGroupId=test_group_10,
inlongStreamId=test_stream_10, fieldName=name, fieldType=string,
fieldComment=null, isPredefinedField=null, fieldValue=null, preExpression=null,
isMetaField=0, metaFieldName=null, fieldFormat=null, originNodeName=null,
originFieldName=null, extParams=null)], extList=null,
sourceList=[MySQLBinlogSource(super=StreamSource(id=12,
inlongGroupId=test_group_10, inlongStreamId=test_stream_10,
sourceType=MYSQL_BINLOG, sourceName=test_source_10, agentIp=null, uuid=null,
inlongClusterName=null, inlongClusterNodeTag=null, dataNodeName=null,
serializationType=debezium_json, snapshot=null, version=1, status=101,
previousStatus=110, creator=admin, modifier=admin, createTime=Thu Nov 02
00:29:16 UTC 2023, modifyTime=Thu Nov 02 00:35:42 UTC 2023, properties={},
templateId=null, subSourceList=null, ignoreParseError=false), user=root,
password=****** hostname=*, port=3306, serverId=0, includeSchema=null,
databaseWhiteList=test,
tableWhiteList=test.source_table, serverTimezone=null, intervalMs=500,
snapshotMode=initial, offsetFilename=null, historyFilename=null,
monitoredDdl=null, timestampFormatStandard=SQL, allMigration=false,
primaryKey=null, specificOffsetFile=null, specificOffsetPos=null)],
sinkList=[HudiSink(super=StreamSink(super=StreamNode(preNodes=null,
postNodes=null, fieldList=null), id=12, inlongGroupId=test_group_10,
inlongStreamId=test_stream_10, sinkType=HUDI, sinkName=test_sink_10,
description=null, inlongClusterName=null,
dataNodeName=65be8d12-4815-40ed-b52e-57d5a8ecdc5c, sortTaskName=null,
sortConsumerGroup=null, enableCreateResource=1, operateLog=success to create
Hudi resource, status=130, previousStatus=130, creator=admin, modifier=admin,
createTime=Thu Nov 02 00:35:06 UTC 2023, modifyTime=Thu Nov 02 00:35:42 UTC
2023, sinkFieldList=[SinkField(id=37, sinkType=null, inlongGroupId=null,
inlongStreamId=null, fieldName=id, fieldType=int, fieldComment=id,
isMetaField=0, metaFieldName=null,
fieldFormat=null, originNodeName=null, originFieldName=null,
sourceFieldName=id, sourceFieldType=int, extParams=null), SinkField(id=38,
sinkType=null, inlongGroupId=null, inlongStreamId=null, fieldName=name,
fieldType=string, fieldComment=name, isMetaField=0, metaFieldName=null,
fieldFormat=null, originNodeName=null, originFieldName=null,
sourceFieldName=name, sourceFieldType=string, extParams=null)], properties={},
dataEncoding=UTF-8, dataFormat=NONE, authentication=null, version=1),
catalogType=HIVE, catalogUri=thrift://*:9083, warehouse=hdfs://*/warehouse,
dbName=test_db, tableName=sink_table, dataPath=null, fileFormat=Parquet,
partitionType=null, primaryKey=, extList=[], partitionKey=null)], version=3,
wrapType=INLONG_MSG_V0, useExtendedFields=false, ignoreParseError=true)],
localJarPath=/opt/inlong-sort/sort-dist-1.10.0-SNAPSHOT.jar,
connectorJarPaths=[/opt/inlong-sort/connectors/sort-connector-mysql-cdc-1.10.0-SNAPSHOT.jar,
/opt/inlong-sort/connectors/sort-connector-hudi-1.10.
0-SNAPSHOT.jar],
localConfPath=/opt/inlong-manager/lib/InLong-Sort-test_group_10,
sourceType=null, sinkType=null, jobId=null, savepointPath=null,
isException=false, exceptionMsg=null) failed:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Unable to create a sink for writing table
'default_catalog.default_database.sink_table'.
Table options are:
'connector'='hudi-inlong'
'hive_sync.db'='test_db'
'hive_sync.enabled'='true'
'hive_sync.metastore.uris'='thrift://*:9083'
'hive_sync.mode'='hms'
'hive_sync.table'='sink_table'
'hoodie.database.name'='test_db'
'hoodie.datasource.write.recordkey.field'=''
'hoodie.table.name'='sink_table'
'inlong.metric.labels'='groupId=test_group_10&streamId=test_stream_10&nodeId=test_sink_10'
'metrics.audit.key'='16'
'metrics.audit.proxy.hosts'='audit:10081'
'path'='hdfs://*/warehouse/test_db.db/sink_table'
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
~[flink-clients_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
~[flink-clients_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
~[flink-clients_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
~[flink-clients_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117)
~[flink-clients_2.11-1.13.5.jar:1.13.5]
at
org.apache.inlong.manager.plugin.flink.FlinkService.submitJobBySavepoint(FlinkService.java:192)
~[manager-plugins-base-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.plugin.flink.FlinkService.submit(FlinkService.java:144)
~[manager-plugins-base-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.plugin.flink.IntegrationTaskRunner.run(IntegrationTaskRunner.java:58)
~[manager-plugins-base-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
~[?:1.8.0_342]
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
~[?:1.8.0_342]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_342]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
Caused by: org.apache.flink.table.api.ValidationException: Unable to create
a sink for writing table 'default_catalog.default_database.sink_table'.
Table options are:
'connector'='hudi-inlong'
'hive_sync.db'='test_db'
'hive_sync.enabled'='true'
'hive_sync.metastore.uris'='thrift://*:9083'
'hive_sync.mode'='hms'
'hive_sync.table'='sink_table'
'hoodie.database.name'='test_db'
'hoodie.datasource.write.recordkey.field'=''
'hoodie.table.name'='sink_table'
'inlong.metric.labels'='groupId=test_group_10&streamId=test_stream_10&nodeId=test_sink_10'
'metrics.audit.key'='16'
'metrics.audit.proxy.hosts'='audit:10081'
'path'='hdfs://*/warehouse/test_db.db/sink_table'
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:171)
~[flink-table-common-1.13.5.jar:1.13.5]
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:367)
~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
~[scala-library-2.11.12.jar:?]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
~[scala-library-2.11.12.jar:?]
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
~[scala-library-2.11.12.jar:?]
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
~[flink-table-api-java-1.13.5.jar:1.13.5]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
~[flink-table-api-java-1.13.5.jar:1.13.5]
at
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99)
~[flink-table-api-java-1.13.5.jar:1.13.5]
at
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.executeLoadSqls(FlinkSqlParseResult.java:84)
~[?:?]
at
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.execute(FlinkSqlParseResult.java:63)
~[?:?]
at org.apache.inlong.sort.Entrance.main(Entrance.java:76) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_342]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_342]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_342]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-clients_2.11-1.13.5.jar:1.13.5]
... 12 more
Caused by: org.apache.hudi.exception.HoodieValidationException: Field ''
specified in option 'hoodie.datasource.write.recordkey.field' does not exist in
the table schema.
at
org.apache.hudi.table.HoodieTableFactory.lambda$sanityCheck$2(HoodieTableFactory.java:139)
~[?:?]
at java.util.Optional.ifPresent(Optional.java:159) ~[?:1.8.0_342]
at
org.apache.hudi.table.HoodieTableFactory.sanityCheck(HoodieTableFactory.java:137)
~[?:?]
at
org.apache.hudi.table.HoodieTableFactory.createDynamicTableSink(HoodieTableFactory.java:91)
~[?:?]
at
org.apache.inlong.sort.hudi.table.HudiTableInlongFactory.createDynamicTableSink(HudiTableInlongFactory.java:51)
~[?:?]
at
org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:168)
~[flink-table-common-1.13.5.jar:1.13.5]
at
org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:367)
~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:201)
~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:162)
~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
~[scala-library-2.11.12.jar:?]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
~[scala-library-2.11.12.jar:?]
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
~[scala-library-2.11.12.jar:?]
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
~[scala-library-2.11.12.jar:?]
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:162)
~[flink-table-planner-blink_2.11-1.13.5.jar:1.13.5]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1518)
~[flink-table-api-java-1.13.5.jar:1.13.5]
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:740)
~[flink-table-api-java-1.13.5.jar:1.13.5]
at
org.apache.flink.table.api.internal.StatementSetImpl.execute(StatementSetImpl.java:99)
~[flink-table-api-java-1.13.5.jar:1.13.5]
at
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.executeLoadSqls(FlinkSqlParseResult.java:84)
~[?:?]
at
org.apache.inlong.sort.parser.result.FlinkSqlParseResult.execute(FlinkSqlParseResult.java:63)
~[?:?]
at org.apache.inlong.sort.Entrance.main(Entrance.java:76) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_342]
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_342]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_342]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_342]
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
~[flink-clients_2.11-1.13.5.jar:1.13.5]
... 12 more
```
### What you expected to happen
Use Hudi sink
### How to reproduce
Use Hudi sink
### Environment
_No response_
### InLong version
master
### InLong Component
InLong Manager
### Are you willing to submit PR?
- [ ] Yes, I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://www.apache.org/foundation/policies/conduct)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]