[ 
https://issues.apache.org/jira/browse/HBASE-27267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhengsicheng reassigned HBASE-27267:
------------------------------------

    Assignee: zhengsicheng

> Delete causes timestamp to be negative
> --------------------------------------
>
>                 Key: HBASE-27267
>                 URL: https://issues.apache.org/jira/browse/HBASE-27267
>             Project: HBase
>          Issue Type: Bug
>    Affects Versions: 2.3.4
>            Reporter: zhengsicheng
>            Assignee: zhengsicheng
>            Priority: Major
>
> When client-1.1.6 and server-2.3.4 there is a case where the batch delete 
> timestamp is negative
> #  1. RegionServer log message:
> {code:java}
> 2022-07-19 12:13:29,324 WARN  
> [RS_OPEN_REGION-regionserver/HBASE-HOSTNAME1:16020-1.replicationSource.wal-reader.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.regiongroup-2,clusterB]
>  hbase.KeyValueUtil: Timestamp cannot be negative, ts=-4323977095312258207, 
> KeyValueBytesHex=\x00\x00\x00, offset=0, length=40
> 2022-07-19 12:13:29,324 WARN  
> [RS_OPEN_REGION-regionserver/HBASE-HOSTNAME1:16020-1.replicationSource.wal-reader.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.HBASE-HOSTNAME1.local%2C16020%2C1657184880284.regiongroup-2,clusterB]
>  wal.ProtobufLogReader: Encountered a malformed edit, seeking back to last 
> good position in file, from 1099261 to 1078224
> java.io.EOFException: EOF  while reading 660 WAL KVs; started reading at 
> 1078317 and read up to 1099261
>     at 
> org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.readNext(ProtobufLogReader.java:403)
>     at 
> org.apache.hadoop.hbase.regionserver.wal.ReaderBase.next(ReaderBase.java:97)
>     at 
> org.apache.hadoop.hbase.regionserver.wal.ReaderBase.next(ReaderBase.java:85)
>     at 
> org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.readNextEntryAndRecordReaderPosition(WALEntryStream.java:264)
>     at 
> org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.tryAdvanceEntry(WALEntryStream.java:178)
>     at 
> org.apache.hadoop.hbase.replication.regionserver.WALEntryStream.hasNext(WALEntryStream.java:103)
>     at 
> org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.readWALEntries(ReplicationSourceWALReader.java:230)
>     at 
> org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.run(ReplicationSourceWALReader.java:145)
> Caused by: java.lang.IllegalArgumentException: Timestamp cannot be negative, 
> ts=-4323977095312258207, KeyValueBytesHex=\x00\x00\x00, offset=0, length=40
>     at 
> org.apache.hadoop.hbase.KeyValueUtil.checkKeyValueBytes(KeyValueUtil.java:612)
>     at org.apache.hadoop.hbase.KeyValue.<init>(KeyValue.java:346)
>     at 
> org.apache.hadoop.hbase.KeyValueUtil.createKeyValueFromInputStream(KeyValueUtil.java:717)
>     at 
> org.apache.hadoop.hbase.codec.KeyValueCodecWithTags$KeyValueDecoder.parseCell(KeyValueCodecWithTags.java:81)
>     at org.apache.hadoop.hbase.codec.BaseDecoder.advance(BaseDecoder.java:68)
>     at org.apache.hadoop.hbase.wal.WALEdit.readFromCells(WALEdit.java:276)
>     at 
> org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.readNext(ProtobufLogReader.java:387)
>     ... 7 more
> {code}
> # 2. Debug WAL file ,found that the delete operation is caused
> {code:java}
> Sequence=365693989, table=tableA, region=148cedb7b8ca3145690800fd650e084d, at 
> write timestamp=Sat Jul 16 00:50:01 CST 2022
> 2022-07-22 22:09:43,244 ERROR [main] wal.WALPrettyPrinter: Timestamp is 
> negative row=rowkey1, column=d:act, timestamp=-4323977095312258207, 
> type=Delete
> {code}
> # 3. User use spark read/write hbase
> batchsize is 10000
> {code:scala}
> def dataDeleteFromHbase(rdd: RDD[(String, String)], hbase_table: String, 
> hbase_instance: String, hbase_accesskey: String, accumulator: 
> LongAccumulator, buffersize: String, batchsize: Int): Unit = {
>     rdd.foreachPartition(iterator => {
>       val partitionId = TaskContext.getPartitionId()
>       val conf = HBaseConfiguration.create()
>       val connection = SparkHbaseUtils.getconnection(conf)
>       val table = connection.getTable(TableName.valueOf(hbase_table))
>       var deleteList = new util.LinkedList[Delete]()
>       var count = 0
>       var batchCount = 0
>       while (iterator.hasNext) {
>         val element = iterator.next
>         val crc32 = new CRC32()
>         crc32.update(s"${element._1}_${element._2}".getBytes())
>         val crcArr = convertLow4bit2SmallEndan(crc32.getValue)
>         val key = concat(DigestUtils.md5(s"${element._1}_${element._2}"), 
> crcArr)
>         val delete = new Delete(key)
>         deleteList.add(delete)
>         count += 1
>         if (count % batchsize.toInt == 0) {
>           batchCount = batchCount + 1
>           try {
>             table.delete(deleteList)
>           } catch {
>             case _: RetriesExhaustedWithDetailsException => {
>               LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: 
> ${batchCount}===Wait 1000 ms, retry......============")
>               Thread.sleep(1000)
>               processDelThrottlingException(table, deleteList, partitionId, 
> batchCount)
>             }
>             case _: ThrottlingException => {
>               LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: 
> ${batchCount}===Wait 1000 ms, retry......============")
>               Thread.sleep(1000)
>               processDelThrottlingException(table, deleteList, partitionId, 
> batchCount)
>             }
>           }
>           LOGGER.warn(s"======partitionId: ${partitionId}===${batchCount * 
> batchsize} rows delete success! ============")
>           accumulator.add(batchsize)
>           LOGGER.warn(s"##########################already delete count: 
> ${accumulator.value}#######################")
>           deleteList = new util.LinkedList[Delete]()
>         }
>       }
>       if (CollectionUtils.isNotEmpty(deleteList)) {
>         batchCount = batchCount + 1
>         val listSize = deleteList.size()
>         try {
>           table.delete(deleteList)
>         } catch {
>           case _: RetriesExhaustedWithDetailsException => {
>             LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: 
> ${batchCount}===Wait 1000 ms, retry......============")
>             Thread.sleep(1000)
>             processDelThrottlingException(table, deleteList, partitionId, 
> batchCount)
>           }
>           case _: ThrottlingException => {
>             LOGGER.warn(s"======partitionId: ${partitionId}===batchCount: 
> ${batchCount}===Wait 1000 ms, retry......============")
>             Thread.sleep(1000)
>             processDelThrottlingException(table, deleteList, partitionId, 
> batchCount)
>           }
>         }
>         LOGGER.warn(s"======partitionId: ${partitionId}===${(batchCount - 1) 
> * batchsize + listSize} rows delete success! ============")
>         accumulator.add(listSize)
>         LOGGER.warn(s"##########################already delete count: 
> ${accumulator.value}#######################")
>       }
>       if (table != null) {
>         table.close()
>       }
>       if (connection != null) {
>         connection.close()
>       }
>     })
>   }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to