In addition to my previous comment, I raised a support ticket for this issue with Cloudera and one of the support person mentioned below,
*"Thank you for clarifying, The exceptions are logged but not re-thrown to an upper layer, so that explains why the Spark application is not aware of the underlying error."* On 5 March 2018 at 21:02, Ravi Kanth <ravikanth....@gmail.com> wrote: > Mike, > > Thanks for the information. But, once the connection to any of the Kudu > servers is lost then there is no way I can have a control on the > KuduSession object and so with getPendingErrors(). The KuduClient in this > case is becoming a zombie and never returned back till the connection is > properly established. I tried doing all that you have suggested with no > luck. Attaching my KuduClient code. > > package org.dwh.streaming.kudu.sparkkudustreaming; > > import java.util.HashMap; > import java.util.Iterator; > import java.util.Map; > import org.apache.hadoop.util.ShutdownHookManager; > import org.apache.kudu.client.*; > import org.apache.spark.api.java.JavaRDD; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > import org.dwh.streaming.kudu.sparkkudustreaming.constants. > SpecialNullConstants; > > public class KuduProcess { > private static Logger logger = LoggerFactory.getLogger(KuduProcess.class); > private KuduTable table; > private KuduSession session; > > public static void upsertKudu(JavaRDD<Map<String, Object>> rdd, String > host, String tableName) { > rdd.foreachPartition(iterator -> { > RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, tableName, > host); > int errorCount = errors.getRowErrors().length; > if(errorCount > 0){ > throw new RuntimeException("Failed to write " + errorCount + " messages > into Kudu"); > } > }); > } > private static RowErrorsAndOverflowStatus > upsertOpIterator(Iterator<Map<String, > Object>> iter, String tableName, String host) { > try { > AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host); > KuduClient client = asyncClient.syncClient(); > table = client.openTable(tableName); > session = client.newSession(); > session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_ > BACKGROUND); > while (iter.hasNext()) { > upsertOp(iter.next()); > } > } catch (KuduException e) { > logger.error("Exception in upsertOpIterator method", e); > } > finally{ > try { > session.close(); > } catch (KuduException e) { > logger.error("Exception in Connection close", e); > } > } > return session.getPendingErrors(); ---------------------> Once, > the connection is lost, this part of the code never gets called and the > Spark job will keep on running and processing the records while the > KuduClient is trying to connect to Kudu. Meanwhile, we are loosing all the > records. > } > public static void upsertOp(Map<String, Object> formattedMap) { > if (formattedMap.size() != 0) { > try { > Upsert upsert = table.newUpsert(); > PartialRow row = upsert.getRow(); > for (Map.Entry<String, Object> entry : formattedMap.entrySet()) { > if (entry.getValue().getClass().equals(String.class)) { > if (entry.getValue().equals(SpecialNullConstants.specialStringNull)) > row.setNull(entry.getKey()); > else > row.addString(entry.getKey(), (String) entry.getValue()); > } else if (entry.getValue().getClass().equals(Long.class)) { > if (entry.getValue().equals(SpecialNullConstants.specialLongNull)) > row.setNull(entry.getKey()); > else > row.addLong(entry.getKey(), (Long) entry.getValue()); > } else if (entry.getValue().getClass().equals(Integer.class)) { > if (entry.getValue().equals(SpecialNullConstants.specialIntNull)) > row.setNull(entry.getKey()); > else > row.addInt(entry.getKey(), (Integer) entry.getValue()); > } > } > > session.apply(upsert); > } catch (Exception e) { > logger.error("Exception during upsert:", e); > } > } > } > } > class KuduConnection { > private static Logger logger = LoggerFactory.getLogger( > KuduConnection.class); > private static Map<String, AsyncKuduClient> asyncCache = new HashMap<>(); > private static int ShutdownHookPriority = 100; > > static AsyncKuduClient getAsyncClient(String kuduMaster) { > if (!asyncCache.containsKey(kuduMaster)) { > AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder( > kuduMaster).build(); > ShutdownHookManager.get().addShutdownHook(new Runnable() { > @Override > public void run() { > try { > asyncClient.close(); > } catch (Exception e) { > logger.error("Exception closing async client", e); > } > } > }, ShutdownHookPriority); > asyncCache.put(kuduMaster, asyncClient); > } > return asyncCache.get(kuduMaster); > } > } > > > > Thanks, > Ravi > > On 5 March 2018 at 16:20, Mike Percy <mpe...@apache.org> wrote: > >> Hi Ravi, it would be helpful if you could attach what you are getting >> back from getPendingErrors() -- perhaps from dumping RowError.toString() >> from items in the returned array -- and indicate what you were hoping to >> get back. Note that a RowError can also return to you the Operation >> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation--> >> that you used to generate the write. From the Operation, you can get the >> original PartialRow >> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html> >> object, which should be able to identify the affected row that the write >> failed for. Does that help? >> >> Since you are using the Kudu client directly, Spark is not involved from >> the Kudu perspective, so you will need to deal with Spark on your own in >> that case. >> >> Mike >> >> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth....@gmail.com> >> wrote: >> >>> Hi Mike, >>> >>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND. >>> >>> So, I am trying to use Kudu Client API to perform UPSERT into Kudu and I >>> integrated this with Spark. I am trying to test a case where in if any of >>> Kudu server fails. So, in this case, if there is any problem in writing, >>> getPendingErrors() should give me a way to handle these errors so that I >>> can successfully terminate my Spark Job. This is what I am trying to do. >>> >>> But, I am not able to get a hold of the exceptions being thrown from >>> with in the KuduClient when retrying to connect to Tablet Server. My >>> getPendingErrors is not getting ahold of these exceptions. >>> >>> Let me know if you need more clarification. I can post some Snippets. >>> >>> Thanks, >>> Ravi >>> >>> On 5 March 2018 at 13:18, Mike Percy <mpe...@apache.org> wrote: >>> >>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND >>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>? >>>> You mention that you are trying to use getPendingErrors() >>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors--> >>>> but >>>> it sounds like it's not working for you -- can you be more specific about >>>> what you expect and what you are observing? >>>> >>>> Thanks, >>>> Mike >>>> >>>> >>>> >>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth <ravikanth....@gmail.com> >>>> wrote: >>>> >>>>> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't >>>>> see any issues in production and we are not losing tablet servers. But, as >>>>> part of testing I have to generate few unforeseen cases to analyse the >>>>> application performance. One among that is bringing down the tablet server >>>>> or master server intentionally during which I observed the loss of >>>>> records. >>>>> Just wanted to test cases out of the happy path here. Once again thanks >>>>> for >>>>> taking time to respond to me. >>>>> >>>>> - Ravi >>>>> >>>>> On 26 February 2018 at 19:58, Clifford Resnick <cresn...@mediamath.com >>>>> > wrote: >>>>> >>>>>> I'll have to get back to you on the code bits, but I'm pretty sure >>>>>> we're doing simple sync batching. We're not in production yet, but after >>>>>> some months of development I haven't seen any failures, even when pushing >>>>>> load doing multiple years' backfill. I think the real question is why are >>>>>> you losing tablet servers? The only instability we ever had with Kudu was >>>>>> when it had that weird ntp sync issue that was fixed I think for 1.6. >>>>>> What >>>>>> version are you running? >>>>>> >>>>>> Anyway I would think that infinite loop should be catchable >>>>>> somewhere. Our pipeline is set to fail/retry with Flink snapshots. I >>>>>> imagine there is similar with Spark. Sorry I cant be of more help! >>>>>> >>>>>> >>>>>> >>>>>> On Feb 26, 2018 9:10 PM, Ravi Kanth <ravikanth....@gmail.com> wrote: >>>>>> >>>>>> Cliff, >>>>>> >>>>>> Thanks for the response. Well, I do agree that its simple and >>>>>> seamless. In my case, I am able to upsert ~25000 events/sec into Kudu. >>>>>> But, >>>>>> I am facing the problem when any of the Kudu Tablet or master server is >>>>>> down. I am not able to get a hold of the exception from client. The >>>>>> client >>>>>> is going into an infinite loop trying to connect to Kudu. Meanwhile, I am >>>>>> loosing my records. I tried handling the errors through >>>>>> getPendingErrors() >>>>>> but still it is helpless. I am using AsyncKuduClient to establish the >>>>>> connection and retrieving the syncClient from the Async to open the >>>>>> session >>>>>> and table. Any help? >>>>>> >>>>>> Thanks, >>>>>> Ravi >>>>>> >>>>>> On 26 February 2018 at 18:00, Cliff Resnick <cre...@gmail.com> wrote: >>>>>> >>>>>> While I can't speak for Spark, we do use the client API from Flink >>>>>> streaming and it's simple and seamless. It's especially nice if you >>>>>> require >>>>>> an Upsert semantic. >>>>>> >>>>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth" <ravikanth....@gmail.com> >>>>>> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> Anyone using Spark Streaming to ingest data into Kudu and using Kudu >>>>>> Client API to do so rather than the traditional KuduContext API? I am >>>>>> stuck >>>>>> at a point and couldn't find a solution. >>>>>> >>>>>> Thanks, >>>>>> Ravi >>>>>> >>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >