[ 
https://issues.apache.org/jira/browse/PHOENIX-5727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rahul updated PHOENIX-5727:
---------------------------
    Description: 
Hi,

I have a spark job which reads from kafka stream and writes to a phoenix table 
using Phoenix JDBC thick client with commit size of 500 what i have observed is 
the job silently fails to do upserts without throwing any errors this happens 
intermittently the frequency of data what i get is around 1000 rows/sec.

And my Input data set is such that we will have more updates on the row keys 
than inserts.

for every 1000 records what we get we have around 990 updates 

is this is known issue with phoenix?

 

Sample Code

A and B are composite keys with commit size of 500

 

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
                        val con_startTimeMillis = System.currentTimeMillis()    
                   

val con = DriverManager.getConnection("jdbc:phoenix:localhost")                 
      

println(">>>> time taken for connection::" + (System.currentTimeMillis() - 
con_startTimeMillis).toDouble / 1000 + " secs")                     

  con.setAutoCommit(false);                                                

for loop

{                                var a = rec.getAs("A").toString                
             

  var b = rec.getAs("B").toString                               

var c = rec.getAs("C").toString                                v

ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString              
       

          var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString   
                            

var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString             
                  

var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString             
                  

var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString            
                    \                             var upsert_stmt = "upsert 
into " + phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + "','" 
+ d + "','" + e  + "','" + f + "','" + g + "','" + h + "')"                     
          

println(">>>>upsert statement formed::" + upsert_stmt)                          
      

var stmt = con.prepareStatement(upsert_stmt)                               

stmt.executeUpdate()                           

    bs=bs+1;                               

if (bs % commitSize == 0)

 

{                                    con.commit()                               
 }

                        }

 

                      con.commit()

                        con.close()

 

  was:
Hi,

I have a spark job which reads from kafka stream and writes to a phoenix table 
using Phoenix JDBC thick client with commit size of 500 what i have observed is 
the job silently fails to do upserts without throwing any errors this happens 
intermittently the frequency of data what i get is around 1000 rows/sec.

And my Input data set is such that we will have more updates on the row keys 
than inserts.

is this is known issue with phoenix?

 

Sample Code

A and B are composite keys with commit size of 500

 

Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
                        val con_startTimeMillis = System.currentTimeMillis()    
                   

val con = DriverManager.getConnection("jdbc:phoenix:localhost")                 
      

println(">>>> time taken for connection::" + (System.currentTimeMillis() - 
con_startTimeMillis).toDouble / 1000 + " secs")                     

  con.setAutoCommit(false);                                                

for loop

{                                var a = rec.getAs("A").toString                
             

  var b = rec.getAs("B").toString                               

var c = rec.getAs("C").toString                                v

ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString              
       

          var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString   
                            

var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString             
                  

var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString             
                  

var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString            
                    \                             var upsert_stmt = "upsert 
into " + phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + "','" 
+ d + "','" + e  + "','" + f + "','" + g + "','" + h + "')"                     
          

println(">>>>upsert statement formed::" + upsert_stmt)                          
      

var stmt = con.prepareStatement(upsert_stmt)                               

stmt.executeUpdate()                           

    bs=bs+1;                               

if (bs % commitSize == 0)

\\{                                    con.commit()                             
   }

                        }

 

                      con.commit()

                        con.close()

 


> Intermittent Upserts with Kafka and Spark Streaming
> ---------------------------------------------------
>
>                 Key: PHOENIX-5727
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-5727
>             Project: Phoenix
>          Issue Type: Bug
>    Affects Versions: 4.14.0
>            Reporter: Rahul
>            Priority: Major
>
> Hi,
> I have a spark job which reads from kafka stream and writes to a phoenix 
> table using Phoenix JDBC thick client with commit size of 500 what i have 
> observed is the job silently fails to do upserts without throwing any errors 
> this happens intermittently the frequency of data what i get is around 1000 
> rows/sec.
> And my Input data set is such that we will have more updates on the row keys 
> than inserts.
> for every 1000 records what we get we have around 990 updates 
> is this is known issue with phoenix?
>  
> Sample Code
> A and B are composite keys with commit size of 500
>  
> Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
>                         val con_startTimeMillis = System.currentTimeMillis()  
>                      
> val con = DriverManager.getConnection("jdbc:phoenix:localhost")               
>         
> println(">>>> time taken for connection::" + (System.currentTimeMillis() - 
> con_startTimeMillis).toDouble / 1000 + " secs")                     
>   con.setAutoCommit(false);                                                
> for loop
> {                                var a = rec.getAs("A").toString              
>                
>   var b = rec.getAs("B").toString                               
> var c = rec.getAs("C").toString                                v
> ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString            
>          
>           var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString 
>                               
> var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString           
>                     
> var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString           
>                     
> var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString          
>                       \                             var upsert_stmt = "upsert 
> into " + phoenix_tablename + " values ('" + a + "','" + b  + "','" + c + 
> "','" + d + "','" + e  + "','" + f + "','" + g + "','" + h + "')"             
>                   
> println(">>>>upsert statement formed::" + upsert_stmt)                        
>         
> var stmt = con.prepareStatement(upsert_stmt)                               
> stmt.executeUpdate()                           
>     bs=bs+1;                               
> if (bs % commitSize == 0)
>  
> {                                    con.commit()                             
>    }
>                         }
>  
>                       con.commit()
>                         con.close()
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to