In addition to my previous comment, I raised a support ticket for this
issue with Cloudera and one of the support person mentioned below,

*"Thank you for clarifying, The exceptions are logged but not re-thrown to
an upper layer, so that explains why the Spark application is not aware of
the underlying error."*

On 5 March 2018 at 21:02, Ravi Kanth <ravikanth....@gmail.com> wrote:

> Mike,
>
> Thanks for the information. But, once the connection to any of the Kudu
> servers is lost then there is no way I can have a control on the
> KuduSession object and so with getPendingErrors(). The KuduClient in this
> case is becoming a zombie and never returned back till the connection is
> properly established. I tried doing all that you have suggested with no
> luck. Attaching my KuduClient code.
>
> package org.dwh.streaming.kudu.sparkkudustreaming;
>
> import java.util.HashMap;
> import java.util.Iterator;
> import java.util.Map;
> import org.apache.hadoop.util.ShutdownHookManager;
> import org.apache.kudu.client.*;
> import org.apache.spark.api.java.JavaRDD;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.dwh.streaming.kudu.sparkkudustreaming.constants.
> SpecialNullConstants;
>
> public class KuduProcess {
> private static Logger logger = LoggerFactory.getLogger(KuduProcess.class);
> private KuduTable table;
> private KuduSession session;
>
> public static void upsertKudu(JavaRDD<Map<String, Object>> rdd, String
> host, String tableName) {
> rdd.foreachPartition(iterator -> {
> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, tableName,
> host);
> int errorCount = errors.getRowErrors().length;
> if(errorCount > 0){
> throw new RuntimeException("Failed to write " + errorCount + " messages
> into Kudu");
> }
> });
> }
> private static RowErrorsAndOverflowStatus 
> upsertOpIterator(Iterator<Map<String,
> Object>> iter, String tableName, String host) {
> try {
> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
> KuduClient client = asyncClient.syncClient();
> table = client.openTable(tableName);
> session = client.newSession();
> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_
> BACKGROUND);
> while (iter.hasNext()) {
> upsertOp(iter.next());
> }
> } catch (KuduException e) {
> logger.error("Exception in upsertOpIterator method", e);
> }
> finally{
> try {
> session.close();
> } catch (KuduException e) {
> logger.error("Exception in Connection close", e);
> }
> }
> return session.getPendingErrors();        ---------------------> Once,
> the connection is lost, this part of the code never gets called and the
> Spark job will keep on running and processing the records while the
> KuduClient is trying to connect to Kudu. Meanwhile, we are loosing all the
> records.
> }
> public static void upsertOp(Map<String, Object> formattedMap) {
> if (formattedMap.size() != 0) {
> try {
> Upsert upsert = table.newUpsert();
> PartialRow row = upsert.getRow();
> for (Map.Entry<String, Object> entry : formattedMap.entrySet()) {
> if (entry.getValue().getClass().equals(String.class)) {
> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
> row.setNull(entry.getKey());
> else
> row.addString(entry.getKey(), (String) entry.getValue());
> } else if (entry.getValue().getClass().equals(Long.class)) {
> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
> row.setNull(entry.getKey());
> else
> row.addLong(entry.getKey(), (Long) entry.getValue());
> } else if (entry.getValue().getClass().equals(Integer.class)) {
> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
> row.setNull(entry.getKey());
> else
> row.addInt(entry.getKey(), (Integer) entry.getValue());
> }
> }
>
> session.apply(upsert);
> } catch (Exception e) {
> logger.error("Exception during upsert:", e);
> }
> }
> }
> }
> class KuduConnection {
> private static Logger logger = LoggerFactory.getLogger(
> KuduConnection.class);
> private static Map<String, AsyncKuduClient> asyncCache = new HashMap<>();
> private static int ShutdownHookPriority = 100;
>
> static AsyncKuduClient getAsyncClient(String kuduMaster) {
> if (!asyncCache.containsKey(kuduMaster)) {
> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(
> kuduMaster).build();
> ShutdownHookManager.get().addShutdownHook(new Runnable() {
> @Override
> public void run() {
> try {
> asyncClient.close();
> } catch (Exception e) {
> logger.error("Exception closing async client", e);
> }
> }
> }, ShutdownHookPriority);
> asyncCache.put(kuduMaster, asyncClient);
> }
> return asyncCache.get(kuduMaster);
> }
> }
>
>
>
> Thanks,
> Ravi
>
> On 5 March 2018 at 16:20, Mike Percy <mpe...@apache.org> wrote:
>
>> Hi Ravi, it would be helpful if you could attach what you are getting
>> back from getPendingErrors() -- perhaps from dumping RowError.toString()
>> from items in the returned array -- and indicate what you were hoping to
>> get back. Note that a RowError can also return to you the Operation
>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
>> that you used to generate the write. From the Operation, you can get the
>> original PartialRow
>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
>> object, which should be able to identify the affected row that the write
>> failed for. Does that help?
>>
>> Since you are using the Kudu client directly, Spark is not involved from
>> the Kudu perspective, so you will need to deal with Spark on your own in
>> that case.
>>
>> Mike
>>
>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth....@gmail.com>
>> wrote:
>>
>>> Hi Mike,
>>>
>>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>>>
>>> So, I am trying to use Kudu Client API to perform UPSERT into Kudu and I
>>> integrated this with Spark. I am trying to test a case where in if any of
>>> Kudu server fails. So, in this case, if there is any problem in writing,
>>> getPendingErrors() should give me a way to handle these errors so that I
>>> can successfully terminate my Spark Job. This is what I am trying to do.
>>>
>>> But, I am not able to get a hold of the exceptions being thrown from
>>> with in the KuduClient when retrying to connect to Tablet Server. My
>>> getPendingErrors is not getting ahold of these exceptions.
>>>
>>> Let me know if you need more clarification. I can post some Snippets.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 5 March 2018 at 13:18, Mike Percy <mpe...@apache.org> wrote:
>>>
>>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>?
>>>> You mention that you are trying to use getPendingErrors()
>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors-->
>>>>  but
>>>> it sounds like it's not working for you -- can you be more specific about
>>>> what you expect and what you are observing?
>>>>
>>>> Thanks,
>>>> Mike
>>>>
>>>>
>>>>
>>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth <ravikanth....@gmail.com>
>>>> wrote:
>>>>
>>>>> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't
>>>>> see any issues in production and we are not losing tablet servers. But, as
>>>>> part of testing I have to generate few unforeseen cases to analyse the
>>>>> application performance. One among that is bringing down the tablet server
>>>>> or master server intentionally during which I observed the loss of 
>>>>> records.
>>>>> Just wanted to test cases out of the happy path here. Once again thanks 
>>>>> for
>>>>> taking time to respond to me.
>>>>>
>>>>> - Ravi
>>>>>
>>>>> On 26 February 2018 at 19:58, Clifford Resnick <cresn...@mediamath.com
>>>>> > wrote:
>>>>>
>>>>>> I'll have to get back to you on the code bits, but I'm pretty sure
>>>>>> we're doing simple sync batching. We're not in production yet, but after
>>>>>> some months of development I haven't seen any failures, even when pushing
>>>>>> load doing multiple years' backfill. I think the real question is why are
>>>>>> you losing tablet servers? The only instability we ever had with Kudu was
>>>>>> when it had that weird ntp sync issue that was fixed I think for 1.6. 
>>>>>> What
>>>>>> version are you running?
>>>>>>
>>>>>> Anyway I would think that infinite loop should be catchable
>>>>>> somewhere. Our pipeline is set to fail/retry with Flink snapshots. I
>>>>>> imagine there is similar with Spark. Sorry I cant be of more help!
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Feb 26, 2018 9:10 PM, Ravi Kanth <ravikanth....@gmail.com> wrote:
>>>>>>
>>>>>> Cliff,
>>>>>>
>>>>>> Thanks for the response. Well, I do agree that its simple and
>>>>>> seamless. In my case, I am able to upsert ~25000 events/sec into Kudu. 
>>>>>> But,
>>>>>> I am facing the problem when any of the Kudu Tablet or master server is
>>>>>> down. I am not able to get a hold of the exception from client. The 
>>>>>> client
>>>>>> is going into an infinite loop trying to connect to Kudu. Meanwhile, I am
>>>>>> loosing my records. I tried handling the errors through 
>>>>>> getPendingErrors()
>>>>>> but still it is helpless. I am using AsyncKuduClient to establish the
>>>>>> connection and retrieving the syncClient from the Async to open the 
>>>>>> session
>>>>>> and table. Any help?
>>>>>>
>>>>>> Thanks,
>>>>>> Ravi
>>>>>>
>>>>>> On 26 February 2018 at 18:00, Cliff Resnick <cre...@gmail.com> wrote:
>>>>>>
>>>>>> While I can't speak for Spark, we do use the client API from Flink
>>>>>> streaming and it's simple and seamless. It's especially nice if you 
>>>>>> require
>>>>>> an Upsert semantic.
>>>>>>
>>>>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth" <ravikanth....@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
>>>>>> Client API to do so rather than the traditional KuduContext API? I am 
>>>>>> stuck
>>>>>> at a point and couldn't find a solution.
>>>>>>
>>>>>> Thanks,
>>>>>> Ravi
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to