[ https://issues.apache.org/jira/browse/HBASE-27267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhengsicheng reassigned HBASE-27267: ------------------------------------ Assignee: zhengsicheng > Delete causes timestamp to be negative > -------------------------------------- > > Key: HBASE-27267 > URL: https://issues.apache.org/jira/browse/HBASE-27267 > Project: HBase > Issue Type: Bug > Affects Versions: 2.3.4 > Reporter: zhengsicheng > Assignee: zhengsicheng > Priority: Major > > When client-1.1.6 and server-2.3.4 there is a case where the batch delete > timestamp is negative > # 1. RegionServer log message: > {code:java} > 2022-07-19 12:13:29,324 WARN > [RS_OPEN_REGION-regionserver/HBASE-HOSTNAME1:16020-1.replicationSource.wal-reader.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.regiongroup-2,clusterB] > hbase.KeyValueUtil: Timestamp cannot be negative, ts=-4323977095312258207, > KeyValueBytesHex=\x00\x00\x00, offset=0, length=40 > 2022-07-19 12:13:29,324 WARN > [RS_OPEN_REGION-regionserver/HBASE-HOSTNAME1:16020-1.replicationSource.wal-reader.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.regiongroup-2,clusterB] > wal.ProtobufLogReader: Encountered a malformed edit, seeking back to last > good position in file, from 1099261 to 1078224 > java.io.EOFException: EOF while reading 660 WAL KVs; started reading at > 1078317 and read up to 1099261 > at > org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.readNext(ProtobufLogReader.java:403) > at > org.apache.hadoop.hbase.regionserver.wal.ReaderBase.next(ReaderBase.java:97) > at > org.apache.hadoop.hbase.regionserver.wal.ReaderBase.next(ReaderBase.java:85) > at > org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.readNextEntryAndRecordReaderPosition(WALEntryStream.java:264) > at > org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.tryAdvanceEntry(WALEntryStream.java:178) > at > org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.hasNext(WALEntryStream.java:103) > at > org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.readWALEntries(ReplicationSourceWALReader.java:230) > at > org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.run(ReplicationSourceWALReader.java:145) > Caused by: java.lang.IllegalArgumentException: Timestamp cannot be negative, > ts=-4323977095312258207, KeyValueBytesHex=\x00\x00\x00, offset=0, length=40 > at > org.apache.hadoop.hbase.KeyValueUtil.checkKeyValueBytes(KeyValueUtil.java:612) > at org.apache.hadoop.hbase.KeyValue.<init>(KeyValue.java:346) > at > org.apache.hadoop.hbase.KeyValueUtil.createKeyValueFromInputStream(KeyValueUtil.java:717) > at > org.apache.hadoop.hbase.codec.KeyValueCodecWithTags$KeyValueDecoder.parseCell(KeyValueCodecWithTags.java:81) > at org.apache.hadoop.hbase.codec.BaseDecoder.advance(BaseDecoder.java:68) > at org.apache.hadoop.hbase.wal.WALEdit.readFromCells(WALEdit.java:276) > at > org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.readNext(ProtobufLogReader.java:387) > ... 7 more > {code} > # 2. Debug WAL file ,found that the delete operation is caused > {code:java} > Sequence=365693989, table=tableA, region=148cedb7b8ca3145690800fd650e084d, at > write timestamp=Sat Jul 16 00:50:01 CST 2022 > 2022-07-22 22:09:43,244 ERROR [main] wal.WALPrettyPrinter: Timestamp is > negative row=rowkey1, column=d:act, timestamp=-4323977095312258207, > type=Delete > {code} > # 3. User use spark read/write hbase > batchsize is 10000 > {code:scala} > def dataDeleteFromHbase(rdd: RDD[(String, String)], hbase_table: String, > hbase_instance: String, hbase_accesskey: String, accumulator: > LongAccumulator, buffersize: String, batchsize: Int): Unit = { > rdd.foreachPartition(iterator => { > val partitionId = TaskContext.getPartitionId() > val conf = HBaseConfiguration.create() > val connection = SparkHbaseUtils.getconnection(conf) > val table = connection.getTable(TableName.valueOf(hbase_table)) > var deleteList = new util.LinkedList[Delete]() > var count = 0 > var batchCount = 0 > while (iterator.hasNext) { > val element = iterator.next > val crc32 = new CRC32() > crc32.update(s"${element._1}_${element._2}".getBytes()) > val crcArr = convertLow4bit2SmallEndan(crc32.getValue) > val key = concat(DigestUtils.md5(s"${element._1}_${element._2}"), > crcArr) > val delete = new Delete(key) > deleteList.add(delete) > count += 1 > if (count % batchsize.toInt == 0) { > batchCount = batchCount + 1 > try { > table.delete(deleteList) > } catch { > case _: RetriesExhaustedWithDetailsException => { > LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: > ${batchCount}===Wait 1000 ms, retry......============") > Thread.sleep(1000) > processDelThrottlingException(table, deleteList, partitionId, > batchCount) > } > case _: ThrottlingException => { > LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: > ${batchCount}===Wait 1000 ms, retry......============") > Thread.sleep(1000) > processDelThrottlingException(table, deleteList, partitionId, > batchCount) > } > } > LOGGER.warn(s"======partitionId: ${partitionId}===${batchCount * > batchsize} rows delete success! ============") > accumulator.add(batchsize) > LOGGER.warn(s"##########################already delete count: > ${accumulator.value}#######################") > deleteList = new util.LinkedList[Delete]() > } > } > if (CollectionUtils.isNotEmpty(deleteList)) { > batchCount = batchCount + 1 > val listSize = deleteList.size() > try { > table.delete(deleteList) > } catch { > case _: RetriesExhaustedWithDetailsException => { > LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: > ${batchCount}===Wait 1000 ms, retry......============") > Thread.sleep(1000) > processDelThrottlingException(table, deleteList, partitionId, > batchCount) > } > case _: ThrottlingException => { > LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: > ${batchCount}===Wait 1000 ms, retry......============") > Thread.sleep(1000) > processDelThrottlingException(table, deleteList, partitionId, > batchCount) > } > } > LOGGER.warn(s"======partitionId: ${partitionId}===${(batchCount - 1) > * batchsize + listSize} rows delete success! ============") > accumulator.add(listSize) > LOGGER.warn(s"##########################already delete count: > ${accumulator.value}#######################") > } > if (table != null) { > table.close() > } > if (connection != null) { > connection.close() > } > }) > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)