Joal has uploaded a new change for review.
https://gerrit.wikimedia.org/r/282338
Change subject: Correct CqlRecordWriter in cassandra module
......................................................................
Correct CqlRecordWriter in cassandra module
CqlRecordWriter was endlessly looping (generating mapreduce timeout) when
small-enough data was to be inserted into cassandra. The multithreading
approach in conjunction to the small data was leading to an Interrupt
signal being sent to the Cassandra Driver during connection initialisation,
leading to reconnection failure. Approach taken is to regularly check for
end-of-job condition instead of blocking and waiting for being Interrupted.
If not perfectly clean, this method at least prevent the observed failure.
Change-Id: If851c3e97f5c5ff083fb9775c8665ff35355f2d1
---
M
refinery-cassandra/src/main/java/org/wikimedia/analytics/refinery/cassandra/CqlRecordWriter.java
1 file changed, 66 insertions(+), 30 deletions(-)
git pull ssh://gerrit.wikimedia.org:29418/analytics/refinery/source
refs/changes/38/282338/1
diff --git
a/refinery-cassandra/src/main/java/org/wikimedia/analytics/refinery/cassandra/CqlRecordWriter.java
b/refinery-cassandra/src/main/java/org/wikimedia/analytics/refinery/cassandra/CqlRecordWriter.java
index b53a417..c06e704 100644
---
a/refinery-cassandra/src/main/java/org/wikimedia/analytics/refinery/cassandra/CqlRecordWriter.java
+++
b/refinery-cassandra/src/main/java/org/wikimedia/analytics/refinery/cassandra/CqlRecordWriter.java
@@ -90,6 +90,7 @@
CqlRecordWriter(Configuration conf)
{
+ logger.debug("Constructing new MultiThreadCqlRecordWriter");
this.conf = conf;
this.queueSize = conf.getInt(ColumnFamilyOutputFormat.QUEUE_SIZE,
32 * FBUtilities.getAvailableProcessors());
batchThreshold =
conf.getLong(ColumnFamilyOutputFormat.BATCH_THRESHOLD, 32);
@@ -152,6 +153,7 @@
public void close() throws IOException
{
// close all the clients before throwing anything
+ logger.debug("Closing MultiThreadCqlRecordWriter");
IOException clientException = null;
for (RangeClient client : clients.values())
{
@@ -173,6 +175,7 @@
@Override
public void write(Map<String, ByteBuffer> keyColumns, List<ByteBuffer>
values) throws IOException
{
+ logger.debug("Writing with MultiThreadCqlRecordWriter");
TokenRange range = ringCache.getRange(getPartitionKey(keyColumns));
// get the client for the given range, or create a new one
@@ -207,6 +210,7 @@
*/
public class RangeClient extends Thread
{
+ protected final String name;
// The list of endpoints for this range
protected final List<InetAddress> endpoints;
protected Session client;
@@ -226,6 +230,8 @@
public RangeClient(List<InetAddress> endpoints)
{
super("client-" + endpoints);
+ this.name = "RangeClient[client-" + endpoints + "]";
+ logger.debug("Constructing new RangeClient[" + name + "]");
this.endpoints = endpoints;
}
@@ -234,6 +240,7 @@
*/
public void put(List<ByteBuffer> value) throws IOException
{
+ logger.debug("Putting new value in async queue");
while (true)
{
if (lastException != null)
@@ -258,10 +265,14 @@
outer:
while (run || !queue.isEmpty())
{
+ logger.debug("Async Run - Looping while (run ||
!queue.isEmpty) (execution loop)");
List<ByteBuffer> bindVariables;
try
{
- bindVariables = queue.take();
+ logger.debug("Async Run - Getting first batch value to
insert into cassandra");
+ bindVariables = queue.poll(1, TimeUnit.SECONDS);
+ // re-check loop condition if no data available
+ if (bindVariables == null) continue ;
}
catch (InterruptedException e)
{
@@ -270,15 +281,20 @@
}
ListIterator<InetAddress> iter = endpoints.listIterator();
+ // Initialise client if not already done
+ if ((client == null) && (!attempt_connect(iter))) break
outer;
+
while (true)
{
- // send the mutation to the last-used endpoint. first
time through, this will NPE harmlessly.
+ logger.debug("Async Run - Looping inserting batches
into cassandra available client");
+ // send the mutation to the last-used endpoint.
try
{
int i = 0;
PreparedStatement statement =
preparedStatement(client);
while (bindVariables != null)
{
+ logger.debug("Async Run - Looping inserting
value " + Integer.toString(i) + " into cassandra with selected client");
BoundStatement boundStatement = new
BoundStatement(statement);
for (int columnPosition = 0; columnPosition <
bindVariables.size(); columnPosition++)
{
@@ -291,57 +307,73 @@
i++;
if (i >= batchThreshold)
+ {
+ logger.debug("Async Run - Batch full -
Breaking values insertion loop");
break;
+ }
bindVariables = queue.poll();
}
+ logger.debug("Async Run - Batch full or no more
values to insert - Breaking batches insertion loop");
break;
}
catch (Exception e)
{
+ logger.debug("Async Run - Error while inserting
batch with selected client", e);
closeInternal();
if (!iter.hasNext())
{
+ logger.debug("Async Run - No other client to
try to reach, breaking execution loop");
lastException = new IOException(e);
break outer;
}
}
// attempt to connect to a different endpoint
- try
- {
- InetAddress address = iter.next();
- String host = address.getHostName();
- client = CqlConfigHelper.getOutputCluster(host,
conf).connect();
- }
- catch (Exception e)
- {
- //If connection died due to Interrupt, just try
connecting to the endpoint again.
- //There are too many ways for the
Thread.interrupted() state to be cleared, so
- //we can't rely on that here. Until the java
driver gives us a better way of knowing
- //that this exception came from an
InterruptedException, this is the best solution.
- if (canRetryDriverConnection(e))
- {
- iter.previous();
- }
- closeInternal();
-
- // Most exceptions mean something unexpected went
wrong to that endpoint, so
- // we should try again to another. Other
exceptions (auth or invalid request) are fatal.
- if ((e instanceof AuthenticationException || e
instanceof InvalidQueryException) || !iter.hasNext())
- {
- lastException = new IOException(e);
- break outer;
- }
- }
+ if (!attempt_connect(iter)) break outer;
}
}
// close all our connections once we are done.
closeInternal();
}
+ private boolean attempt_connect(ListIterator<InetAddress> iter) {
+ try
+ {
+ logger.debug("Async Run - Try a new client from the list");
+ InetAddress address = iter.next();
+ String host = address.getHostName();
+ client = CqlConfigHelper.getOutputCluster(host,
conf).connect();
+ }
+ catch (Exception e)
+ {
+ //If connection died due to Interrupt, just try connecting
to the endpoint again.
+ //There are too many ways for the Thread.interrupted()
state to be cleared, so
+ //we can't rely on that here. Until the java driver gives
us a better way of knowing
+ //that this exception came from an InterruptedException,
this is the best solution.
+ if (canRetryDriverConnection(e))
+ {
+ logger.debug("Async Run - Error is not critical,
trying with same endpoint");
+ iter.previous();
+ }
+ closeInternal();
+
+ // Most exceptions mean something unexpected went wrong to
that endpoint, so
+ // we should try again to another. Other exceptions (auth
or invalid request) are fatal.
+ if ((e instanceof AuthenticationException || e instanceof
InvalidQueryException) || !iter.hasNext())
+ {
+ logger.debug("Async Run - Error is critical, breaking
execution loop", e);
+ lastException = new IOException(e);
+ return false;
+ }
+ }
+ return true;
+ }
+
+
/** get prepared statement id from cache, otherwise prepare it
from Cassandra server*/
private PreparedStatement preparedStatement(Session client)
{
+ logger.debug("Getting prepared statement");
PreparedStatement statement = preparedStatements.get(client);
if (statement == null)
{
@@ -363,9 +395,11 @@
public void close() throws IOException
{
- // stop the run loop. this will result in closeInternal being
called by the time join() finishes.
+ logger.debug("Closing external");
+ // stop the run loop (Sending Interrupt signal is causing
driver failure, so we rely on
+ // regular condition checking instead).
+ // this will result in closeInternal being called by the time
join() finishes.
run = false;
- interrupt();
try
{
this.join();
@@ -381,6 +415,7 @@
protected void closeInternal()
{
+ logger.debug("Closing internal");
if (client != null)
{
client.close();
@@ -389,6 +424,7 @@
private boolean canRetryDriverConnection(Exception e)
{
+ logger.debug("Checking retry connection");
if (e instanceof DriverException &&
e.getMessage().contains("Connection thread interrupted"))
return true;
if (e instanceof NoHostAvailableException)
--
To view, visit https://gerrit.wikimedia.org/r/282338
To unsubscribe, visit https://gerrit.wikimedia.org/r/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: If851c3e97f5c5ff083fb9775c8665ff35355f2d1
Gerrit-PatchSet: 1
Gerrit-Project: analytics/refinery/source
Gerrit-Branch: master
Gerrit-Owner: Joal <[email protected]>
_______________________________________________
MediaWiki-commits mailing list
[email protected]
https://lists.wikimedia.org/mailman/listinfo/mediawiki-commits