jxwnhj0717 commented on issue #4550:
URL: https://github.com/apache/iceberg/issues/4550#issuecomment-1381352493
I added logs for locking and releasing locks in
LockManagers$InMemoryLockManager, reproducing the bad table issue:
`
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO
org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - acquiring
lock.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO
org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - acquiring
lock.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO
org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - acquired
lock.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO
org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - acquired
lock.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,672 INFO
org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - add lock
heartbeat.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,672 INFO
org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - add lock
heartbeat.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674
INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] -
releasing lock.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/22f4f9c5-35d2-467c-8263-d26fd6548017.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674
INFO org.apache.iceberg.util.LockManagers$InMemoryLockManager [] -
released lock.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/22f4f9c5-35d2-467c-8263-d26fd6548017.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_rep_docsource_type (1/1)#2] 2023-01-11 18:46:03,674
INFO org.apache.iceberg.hadoop.HadoopTableOperations [] -
Committed a new metadata file
hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_docsource_type/metadata/v749.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO
org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - releasing
lock.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/ced05720-8f32-49c9-884d-49b604290622.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO
org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - released
lock.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/ced05720-8f32-49c9-884d-49b604290622.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_base_province (1/1)#2] 2023-01-11 18:46:03,677 INFO
org.apache.iceberg.hadoop.HadoopTableOperations [] - Committed a
new metadata file
hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_province/metadata/v749.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,679 INFO
org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - releasing
lock.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/v749.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_base_city/metadata/27a6b56d-7747-4a7b-9da1-694c5c6f8b3b.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO
org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - releasing
lock.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO
org.apache.iceberg.util.LockManagers$InMemoryLockManager [] - released
lock.
entityId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
ownerId:hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/db1c3994-dd04-410a-aa95-6516b7615246.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_rep_person (1/1)#2] 2023-01-11 18:46:03,679 INFO
org.apache.iceberg.hadoop.HadoopTableOperations [] - Committed a
new metadata file
hdfs://hadoop-node-0001:8022/uas/ods_dms_db/ods_t_rep_person/metadata/v756.metadata.json
[IcebergFilesCommitter -> Sink: IcebergSink
hadoop.ods_dms_db.ods_t_base_city (1/1)#2] 2023-01-11 18:46:03,684 WARN
org.apache.flink.runtime.taskmanager.Task [] -
IcebergFilesCommitter -> Sink: IcebergSink hadoop.ods_dms_db.ods_t_base_city
(1/1)#2 (5dec3e28251735766bb3eb423ca5a45c) switched from RUNNING to FAILED with
failure cause: java.lang.NullPointerException
at
org.apache.iceberg.util.LockManagers$InMemoryLockManager.release(LockManagers.java:235)
at
org.apache.iceberg.hadoop.HadoopTableOperations.renameToFinal(HadoopTableOperations.java:377)
at
org.apache.iceberg.hadoop.HadoopTableOperations.commit(HadoopTableOperations.java:159)
at
org.apache.iceberg.SnapshotProducer.lambda$commit$2(SnapshotProducer.java:317)
at
org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:404)
at
org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:214)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:198)
at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:190)
at org.apache.iceberg.SnapshotProducer.commit(SnapshotProducer.java:295)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitOperation(IcebergFilesCommitter.java:312)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitDeltaTxn(IcebergFilesCommitter.java:276)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.commitUpToCheckpoint(IcebergFilesCommitter.java:218)
at
org.apache.iceberg.flink.sink.IcebergFilesCommitter.notifyCheckpointComplete(IcebergFilesCommitter.java:188)
at
org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.notifyCheckpointComplete(StreamOperatorWrapper.java:99)
at
org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.notifyCheckpointComplete(SubtaskCheckpointCoordinatorImpl.java:334)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:1171)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointCompleteAsync$10(StreamTask.java:1136)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$notifyCheckpointOperation$12(StreamTask.java:1159)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:344)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:330)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:202)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
`
The code is as follows:
`
@VisibleForTesting
void acquireOnce(String entityId, String ownerId) {
InMemoryLockContent content = LOCKS.get(entityId);
if (content != null && content.expireMs() >
System.currentTimeMillis()) {
throw new IllegalStateException(String.format("Lock for %s currently
held by %s, expiration: %s",
entityId, content.ownerId(), content.expireMs()));
}
LOG.info("acquiring lock. entityId:{} ownerId:{}", entityId, ownerId);
long expiration = System.currentTimeMillis() + heartbeatTimeoutMs();
boolean succeed;
if (content == null) {
InMemoryLockContent previous = LOCKS.putIfAbsent(
entityId, new InMemoryLockContent(ownerId, expiration));
succeed = previous == null;
} else {
succeed = LOCKS.replace(entityId, content, new
InMemoryLockContent(ownerId, expiration));
}
if (succeed) {
LOG.info("acquired lock. entityId:{} ownerId:{}", entityId, ownerId);
// cleanup old heartbeat
if (HEARTBEATS.containsKey(entityId)) {
HEARTBEATS.remove(entityId).cancel(false);
LOG.info("remove lock heartbeat. entityId:{} ownerId:{}",
entityId, ownerId);
}
HEARTBEATS.put(entityId, scheduler().scheduleAtFixedRate(() -> {
InMemoryLockContent lastContent = LOCKS.get(entityId);
try {
long newExpiration = System.currentTimeMillis() +
heartbeatTimeoutMs();
LOCKS.replace(entityId, lastContent, new
InMemoryLockContent(ownerId, newExpiration));
} catch (NullPointerException e) {
throw new RuntimeException("Cannot heartbeat to a deleted lock "
+ entityId, e);
}
}, 0, heartbeatIntervalMs(), TimeUnit.MILLISECONDS));
LOG.info("add lock heartbeat. entityId:{} ownerId:{}", entityId,
ownerId);
} else {
throw new IllegalStateException("Unable to acquire lock " +
entityId);
}
}
@Override
public boolean release(String entityId, String ownerId) {
LOG.info("releasing lock. entityId:{} ownerId:{}", entityId, ownerId);
InMemoryLockContent currentContent = LOCKS.get(entityId);
if (currentContent == null) {
LOG.error("Cannot find lock for entity {}", entityId);
return false;
}
if (!currentContent.ownerId().equals(ownerId)) {
LOG.error("Cannot unlock {} by {}, current owner: {}", entityId,
ownerId, currentContent.ownerId());
return false;
}
HEARTBEATS.remove(entityId).cancel(false);
LOCKS.remove(entityId);
LOG.info("released lock. entityId:{} ownerId:{}", entityId, ownerId);
return true;
}
`
HEARTBEATS.remove(entityId) is empty when the lock is released, strangely,
2023-01-11 18:46:03, 672 added entityId heartbeart, 2023-01-11 18:46:03, 684
can not find the corresponding heartbeat, the whole process is single-threaded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]