hi:
First of all thank you very much for replying to my email!
1、I consume a kafka topic, there are multiple tables in this topic,
Insert, Update, Delete operation, this batch only create a KuduSession, submit
will have Insert, Update, Delete operation. If I have 10,000 data in this batch
and add 3,000 new ones, the actual number of kudus will be less than 3,000 and
possibly 1000 or 2,000. One possibility I think is that the data that I update
or need to delete does not exist in kudu. When flush, some data will not be
submitted successfully, but I have no problem in the local test. Loss of data
in the production environment,
2、Below is my code example。
var tables = Array("impala::ucdb.kuduScan1", "impala::ucdb.kuduScan2",
"impala::ucdb.kuduScan3")
var kuduClient = new
KuduClient.KuduClientBuilder("hadoop1:7051,hadoop2:7051,hadoop3:7051").build()
var kuduSession = kuduClient.newSession()
kuduSession.setFlushMode(FlushMode.MANUAL_FLUSH)
kuduSession.setMutationBufferSpace(5)
kuduSession.setFlushInterval(500)
tables.foreach(t => {
var table = kuduClient.openTable(t)
for (i <- 0 until (10)) {
var update = table.newUpdate()
var updateRow = update.getRow
updateRow.addString("id", "#id_" + i + UUID.randomUUID())
updateRow.addString("test1", "tes1^284227349_" + i)
updateRow.addString("test2", "tes2" + i)
updateRow.addString("test3", "test3_" + i)
updateRow.addString("test4", "test4_" + i)
kuduSession.apply(update)
var oper = table.newInsert()
var row = oper.getRow
row.addString("id", "#id_" + i + UUID.randomUUID())
row.addString("test1", "tes1^284227349_" + i)
row.addString("test2", "tes2" + i)
row.addString("test3", "test3_" + i)
row.addString("test4", "test4_" + i)
kuduSession.apply(oper)
var delete = table.newDelete()
var deleteRow = delete.getRow
deleteRow.addString("id", "#id_" + i + UUID.randomUUID())
deleteRow.addString("test1", "tes1^284227349_" + i)
deleteRow.addString("test2", "tes2" + i)
deleteRow.addString("test3", "test3_" + i)
deleteRow.addString("test4", "test4_" + i)
kuduSession.apply(delete)
}
})
var responses= kuduSession.flush()
kuduSession.close()
--
发件人:Todd Lipcon
发送时间:2018年6月15日(星期五) 23:46
收件人:user ; 秦坤
主 题:Re: kudu Insert、Update、Delete operating data lost
Hi,
I'm having trouble understanding your question. Can you give an example of the
operations you are trying and why you believe data is being lost?
-Todd
On Thu, Jun 14, 2018 at 8:24 PM, 秦坤 wrote:
hello:
I use java scan api to operate kudu in large batches If a session contains
Insert, Update, Delete operations, if the database does not exist in the data
there will be some new data loss, how to avoid such problems.
--
Todd Lipcon
Software Engineer, Cloudera