haibo-duan opened a new issue, #9227:
URL: https://github.com/apache/inlong/issues/9227
### What happened
When I used Elasticsearch as the data node, the configuration process
encountered the following error:
[ ] 2023-11-07 08:19:08.132 - INFO [inlong-mq-process-0]
a.i.m.s.r.s.e.ElasticsearchApi:166 - create test_pulsar_table_es:true
[ ] 2023-11-07 08:19:08.135 - INFO [inlong-mq-process-0]
.i.m.s.s.StreamSinkServiceImpl:518 - success to update sink status=130 for id=1
with log: success to create es resource
[ ] 2023-11-07 08:19:08.135 - INFO [inlong-mq-process-0]
.ElasticsearchResourceOperator:111 - success to create es resource for
sinkInfo=SinkInfo(id=1, inlongGroupId=test_pulsar_group,
inlongStreamId=test_pulsar_stream, sinkType=ELASTICSEARCH,
inlongClusterName=null, sinkName=test_pulsar_stream_sink,
dataNodeName=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d, description=null,
enableCreateResource=1,
extParams={"hosts":"http://elasticsearch:9200","username":"admin","password":"******","indexName":"test_pulsar_table_es","flushInterval":null,"flushRecord":1000,"retryTimes":null,"keyFieldNames":null,"documentType":"test_type","primaryKey":"name","esVersion":7,"encryptVersion":1,"properties":{}},
status=100, creator=admin, mqResource=test_pulsar_stream, dataType=CSV,
sourceSeparator=124, dataEscapeChar=null)
[ ] 2023-11-07 08:19:08.139 -ERROR [inlong-mq-process-0]
a.i.m.w.e.LogableEventListener:88 - execute listener
WorkflowEventLogEntity(id=null, processId=3,
processName=CREATE_STREAM_RESOURCE, processDisplayName=Create Stream,
inlongGroupId=test_pulsar_group, taskId=4, elementName=InitSink,
elementDisplayName=Stream-InitSink, eventType=TaskEvent, event=COMPLETE,
listener=StreamSinkResourceListener, startTime=Tue Nov 07 08:19:07 UTC 2023,
endTime=null, status=-1, async=0, ip=172.18.0.7, remark=null, exception=find no
proper cluster assign to group=test_pulsar_group, stream=test_pulsar_stream,
sink type=ELASTICSEARCH, data node=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d )
error:
java.lang.IllegalArgumentException: find no proper cluster assign to
group=test_pulsar_group, stream=test_pulsar_stream, sink type=ELASTICSEARCH,
data node=a8a3d27a-50b7-4264-9b4c-ea88c0a04f8d
at
org.apache.inlong.manager.common.util.Preconditions.expectNotBlank(Preconditions.java:143)
~[manager-common-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.resource.sink.AbstractStandaloneSinkResourceOperator.assignCluster(AbstractStandaloneSinkResourceOperator.java:58)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.resource.sink.es.ElasticsearchResourceOperator.createSinkResource(ElasticsearchResourceOperator.java:79)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.listener.sink.StreamSinkResourceListener.listen(StreamSinkResourceListener.java:95)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$0(QueueResourceListener.java:159)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_342]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_342]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
[ ] 2023-11-07 08:19:08.154 - INFO [inlong-mq-process-0]
.m.s.s.InlongStreamServiceImpl:725 - success to update stream after approve for
groupId=test_pulsar_group, streamId=test_pulsar_stream
[ ] 2023-11-07 08:19:08.160 -ERROR [inlong-mq-process-0]
.m.s.l.q.QueueResourceListener:168 - failed to start stream process for
groupId=test_pulsar_group streamId=test_pulsar_stream
[ ] 2023-11-07 08:19:08.160 -ERROR [inlong-workflow-0]
.m.s.l.q.QueueResourceListener:179 - failed to execute stream process in
asynchronously
java.util.concurrent.ExecutionException:
org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed
to start stream process for groupId=test_pulsar_group
streamId=test_pulsar_stream
at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
~[?:1.8.0_342]
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
~[?:1.8.0_342]
at
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:176)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:134)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:79)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_342]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
Caused by:
org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed
to start stream process for groupId=test_pulsar_group
streamId=test_pulsar_stream
at
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.lambda$createQueueForStreams$1(QueueResourceListener.java:169)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
~[?:1.8.0_342]
at
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
~[?:1.8.0_342]
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
~[?:1.8.0_342]
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
~[?:1.8.0_342]
... 3 more
[ ] 2023-11-07 08:19:08.161 -ERROR [inlong-workflow-0]
a.i.m.w.e.LogableEventListener:88 - execute listener
WorkflowEventLogEntity(id=null, processId=2, processName=CREATE_GROUP_RESOURCE,
processDisplayName=Create Group, inlongGroupId=test_pulsar_group, taskId=2,
elementName=InitMQ, elementDisplayName=Group-InitMQ, eventType=TaskEvent,
event=COMPLETE, listener=QueueResourceListener, startTime=Tue Nov 07 08:19:07
UTC 2023, endTime=null, status=-1, async=0, ip=172.18.0.7, remark=null,
exception=failed to execute stream process in asynchronously :
org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed
to start stream process for groupId=test_pulsar_group
streamId=test_pulsar_stream) error:
org.apache.inlong.manager.common.exceptions.WorkflowListenerException:
failed to execute stream process in asynchronously :
org.apache.inlong.manager.common.exceptions.WorkflowListenerException: failed
to start stream process for groupId=test_pulsar_group
streamId=test_pulsar_stream
at
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.createQueueForStreams(QueueResourceListener.java:180)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.listener.queue.QueueResourceListener.listen(QueueResourceListener.java:134)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.event.LogableEventListener.executeListenerWithLog(LogableEventListener.java:79)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.event.LogableEventListener.listen(LogableEventListener.java:60)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.event.task.TaskEventNotifier.notify(TaskEventNotifier.java:57)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.processor.ServiceTaskProcessor.complete(ServiceTaskProcessor.java:115)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:99)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeComplete(ProcessorExecutorImpl.java:104)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessorExecutorImpl.executeStart(ProcessorExecutorImpl.java:92)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.workflow.core.impl.ProcessServiceImpl.start(ProcessServiceImpl.java:75)
~[manager-workflow-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.workflow.WorkflowServiceImpl.start(WorkflowServiceImpl.java:90)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.workflow.WorkflowService.startAsync(WorkflowService.java:66)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
org.apache.inlong.manager.service.listener.group.apply.ApproveApplyProcessListener.lambda$listen$0(ApproveApplyProcessListener.java:79)
~[manager-service-1.10.0-SNAPSHOT.jar:1.10.0-SNAPSHOT]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
~[?:1.8.0_342]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
~[?:1.8.0_342]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
[ ] 2023-11-07 08:19:08.169 - INFO [inlong-workflow-0]
.s.l.g.InitGroupFailedListener:62 - begin to execute InitGroupFailedListener
for groupId=test_pulsar_group
[ ] 2023-11-07 08:19:08.170 - INFO [inlong-workflow-0]
i.m.s.g.InlongGroupServiceImpl:446 - begin to update group status to [120] for
groupId=test_pulsar_group by user=admin



### What you expected to happen
Use ES sink
### How to reproduce
Use ES sink
### Environment
Ubuntu 22.04.3 LTS
### InLong version
master
### InLong Component
InLong Manager
### Are you willing to submit PR?
- [X] Yes, I am willing to submit a PR!
### Code of Conduct
- [X] I agree to follow this project's [Code of
Conduct](https://www.apache.org/foundation/policies/conduct)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]