stevenzwu commented on issue #6455:
URL: https://github.com/apache/iceberg/issues/6455#issuecomment-1358855539
@MurderWind if you can reproduce it in IDE, you may be able to troubleshoot
it with debugger.
`IcebergSource` should use a new thread pool
```
public ContinuousSplitPlannerImpl(Table table, ScanContext scanContext,
String threadName) {
this.table = table;
this.scanContext = scanContext;
this.isSharedPool = threadName == null;
this.workerPool =
isSharedPool
? ThreadPools.getWorkerPool()
: ThreadPools.newWorkerPool(
"iceberg-plan-worker-pool-" + threadName,
scanContext.planParallelism());
}
```
As the threadName is non-null passed in from `IcebergSource`
```
// Ideally, operatorId should be used as the threadPoolName as Flink
guarantees its uniqueness
// within a job. SplitEnumeratorContext doesn't expose the
OperatorCoordinator.Context, which
// would contain the OperatorID. Need to discuss with Flink community
whether it is ok to expose
// a public API like the protected method "OperatorCoordinator.Context
getCoordinatorContext()"
// from SourceCoordinatorContext implementation. For now, <table
name>-<random UUID> is used as
// the unique thread pool name.
return lazyTable().name() + "-" + UUID.randomUUID();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]