Rahul created PHOENIX-5727:
------------------------------

             Summary: Intermittent Upserts with Kafka and Spark Streaming
                 Key: PHOENIX-5727
                 URL: https://issues.apache.org/jira/browse/PHOENIX-5727
             Project: Phoenix
          Issue Type: Bug
    Affects Versions: 4.14.0
            Reporter: Rahul


Hi,

I have a spark job which reads from kafka stream and writes to a phoenix table 
using Phoenix JDBC thick client with commit size of 500 what i have observed is 
the job silently fails to do upserts without throwing any errors this happens 
intermittently the frequency of data what i get is around 1000 rows/sec.

And my Input data set is such that we will have more updates on the row keys 
than inserts.

is this is known issue with phoenix?

 

Sample Code

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
                        val con_startTimeMillis = System.currentTimeMillis()    
                    val con = 
DriverManager.getConnection("jdbc:phoenix:localhost")                        
println(">>>> time taken for connection::" + (System.currentTimeMillis() - 
con_startTimeMillis).toDouble / 1000 + " secs")                        
con.setAutoCommit(false);                                                for 
loop {                                var a = rec.getAs("A").toString           
                     var b = rec.getAs("B").toString                            
    var c = rec.getAs("C").toString                                var d = if 
(rec.getAs("D") == null) "" else rec.getAs("D").toString                        
        var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString     
                           var f = if (rec.getAs("F") == null) "" else 
rec.getAs("F").toString                                var g = if 
(rec.getAs("G") == null) "" else rec.getAs("G").toString                        
        var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString    
                            var i = if (rec.getAs("I") == null) "" else 
rec.getAs("I").toString 
                                 var upsert_stmt = "upsert into " + 
phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + "','" + d + 
"','" + e  + "','" + f + "','" + g + "','" + h + "')"                           
     println(">>>>upsert statement formed::" + upsert_stmt)                     
            var stmt = con.prepareStatement(upsert_stmt)                        
        stmt.executeUpdate()                                bs=bs+1;            
                    if (bs % commitSize == 0) \{                                
    con.commit()                                }                         }

 

                      con.commit()

                        con.close()

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to