[
https://issues.apache.org/jira/browse/PHOENIX-5727?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rahul updated PHOENIX-5727:
---------------------------
Description:
Hi,
I have a spark job which reads from kafka stream and writes to a phoenix table
using Phoenix JDBC thick client with commit size of 500 what i have observed is
the job silently fails to do upserts without throwing any errors this happens
intermittently the frequency of data what i get is around 1000 rows/sec.
And my Input data set is such that we will have more updates on the row keys
than inserts.
for every 1000 records what we get we have around 990 updates .
And if i execute the upsert statements which were skipped it works fine
Config: Salted table with 4 Region Servers
is this is known issue with phoenix?
Sample Code
A and B are composite keys with commit size of 500
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
val con_startTimeMillis = System.currentTimeMillis()
val con = DriverManager.getConnection("jdbc:phoenix:localhost")
println(">>>> time taken for connection::" + (System.currentTimeMillis() -
con_startTimeMillis).toDouble / 1000 + " secs")
con.setAutoCommit(false);
for loop
{ var a = rec.getAs("A").toString
var b = rec.getAs("B").toString
var c = rec.getAs("C").toString v
ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString
var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString
var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString
var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString
var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString
\ var upsert_stmt = "upsert
into " + phoenix_tablename + " values ('" + a + "','" + b + "','" + c + "','"
+ d + "','" + e + "','" + f + "','" + g + "','" + h + "')"
println(">>>>upsert statement formed::" + upsert_stmt)
var stmt = con.prepareStatement(upsert_stmt)
stmt.executeUpdate()
bs=bs+1;
if (bs % commitSize == 0)
{ con.commit()
}
}
con.commit()
con.close()
was:
Hi,
I have a spark job which reads from kafka stream and writes to a phoenix table
using Phoenix JDBC thick client with commit size of 500 what i have observed is
the job silently fails to do upserts without throwing any errors this happens
intermittently the frequency of data what i get is around 1000 rows/sec.
And my Input data set is such that we will have more updates on the row keys
than inserts.
for every 1000 records what we get we have around 990 updates
is this is known issue with phoenix?
Sample Code
A and B are composite keys with commit size of 500
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
val con_startTimeMillis = System.currentTimeMillis()
val con = DriverManager.getConnection("jdbc:phoenix:localhost")
println(">>>> time taken for connection::" + (System.currentTimeMillis() -
con_startTimeMillis).toDouble / 1000 + " secs")
con.setAutoCommit(false);
for loop
{ var a = rec.getAs("A").toString
var b = rec.getAs("B").toString
var c = rec.getAs("C").toString v
ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString
var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString
var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString
var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString
var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString
\ var upsert_stmt = "upsert
into " + phoenix_tablename + " values ('" + a + "','" + b + "','" + c + "','"
+ d + "','" + e + "','" + f + "','" + g + "','" + h + "')"
println(">>>>upsert statement formed::" + upsert_stmt)
var stmt = con.prepareStatement(upsert_stmt)
stmt.executeUpdate()
bs=bs+1;
if (bs % commitSize == 0)
{ con.commit()
}
}
con.commit()
con.close()
> Intermittent Upserts with Kafka and Spark Streaming
> ---------------------------------------------------
>
> Key: PHOENIX-5727
> URL: https://issues.apache.org/jira/browse/PHOENIX-5727
> Project: Phoenix
> Issue Type: Bug
> Affects Versions: 4.14.0
> Reporter: Rahul
> Priority: Major
>
> Hi,
> I have a spark job which reads from kafka stream and writes to a phoenix
> table using Phoenix JDBC thick client with commit size of 500 what i have
> observed is the job silently fails to do upserts without throwing any errors
> this happens intermittently the frequency of data what i get is around 1000
> rows/sec.
> And my Input data set is such that we will have more updates on the row keys
> than inserts.
> for every 1000 records what we get we have around 990 updates .
> And if i execute the upsert statements which were skipped it works fine
> Config: Salted table with 4 Region Servers
> is this is known issue with phoenix?
>
> Sample Code
> A and B are composite keys with commit size of 500
>
> Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")Class.forName("org.apache.phoenix.jdbc.PhoenixDriver")
> val con_startTimeMillis = System.currentTimeMillis()
>
> val con = DriverManager.getConnection("jdbc:phoenix:localhost")
>
> println(">>>> time taken for connection::" + (System.currentTimeMillis() -
> con_startTimeMillis).toDouble / 1000 + " secs")
> con.setAutoCommit(false);
> for loop
> { var a = rec.getAs("A").toString
>
> var b = rec.getAs("B").toString
> var c = rec.getAs("C").toString v
> ar d = if (rec.getAs("D") == null) "" else rec.getAs("D").toString
>
> var e = if (rec.getAs("E") == null) "" else rec.getAs("E").toString
>
> var f = if (rec.getAs("F") == null) "" else rec.getAs("F").toString
>
> var g = if (rec.getAs("G") == null) "" else rec.getAs("G").toString
>
> var h = if (rec.getAs("H") == null) "0" else rec.getAs("H").toString
> \ var upsert_stmt = "upsert
> into " + phoenix_tablename + " values ('" + a + "','" + b + "','" + c +
> "','" + d + "','" + e + "','" + f + "','" + g + "','" + h + "')"
>
> println(">>>>upsert statement formed::" + upsert_stmt)
>
> var stmt = con.prepareStatement(upsert_stmt)
> stmt.executeUpdate()
> bs=bs+1;
> if (bs % commitSize == 0)
>
> { con.commit()
> }
> }
>
> con.commit()
> con.close()
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)