Mike, Can you clarify a bit on grabbing the jstack for the process? I launched my Spark application and tried to get the pid using which I thought I can grab jstack trace during hang. Unfortunately, I am not able to figure out grabbing pid for Spark application.
Thanks, Ravi On 6 March 2018 at 18:36, Mike Percy <mpe...@apache.org> wrote: > Thanks Ravi. Would you mind attaching the output of jstack on the process > during this hang? That would show what the Kudu client threads are doing, > as what we are seeing here is just the netty boss thread. > > Mike > > On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth <ravikanth....@gmail.com> > wrote: > >> >> Yes, I have debugged to find the root cause. Every logger before "table >> = client.openTable(tableName);" is executing fine and exactly at the >> point of opening the table, it is throwing the below exception and nothing >> is being executed after that. Still the Spark batches are being processed >> and at opening the table is failing. I tried catching it with no luck. >> Please find below the exception. >> >> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer >> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream >> on [id: 0x6e13b01f] >> java.net.ConnectException: Connection refused: >> kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050 >> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl >> .java:717) >> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >> .nio.NioClientBoss.connect(NioClientBoss.java:152) >> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >> .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) >> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >> .nio.NioClientBoss.process(NioClientBoss.java:79) >> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >> .nio.AbstractNioSelector.run(AbstractNioSelector.java:337) >> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >> .nio.NioClientBoss.run(NioClientBoss.java:42) >> at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen >> amingRunnable.run(ThreadRenamingRunnable.java:108) >> at org.apache.kudu.client.shaded.org.jboss.netty.util.internal. >> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) >> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >> Executor.java:1142) >> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >> lExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> >> Thanks, >> Ravi >> >> On 5 March 2018 at 23:52, Mike Percy <mpe...@apache.org> wrote: >> >>> Have you considered checking your session error count or pending errors >>> in your while loop every so often? Can you identify where your code is >>> hanging when the connection is lost (what line)? >>> >>> Mike >>> >>> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth <ravikanth....@gmail.com> >>> wrote: >>> >>>> In addition to my previous comment, I raised a support ticket for this >>>> issue with Cloudera and one of the support person mentioned below, >>>> >>>> *"Thank you for clarifying, The exceptions are logged but not re-thrown >>>> to an upper layer, so that explains why the Spark application is not aware >>>> of the underlying error."* >>>> >>>> On 5 March 2018 at 21:02, Ravi Kanth <ravikanth....@gmail.com> wrote: >>>> >>>>> Mike, >>>>> >>>>> Thanks for the information. But, once the connection to any of the >>>>> Kudu servers is lost then there is no way I can have a control on the >>>>> KuduSession object and so with getPendingErrors(). The KuduClient in this >>>>> case is becoming a zombie and never returned back till the connection is >>>>> properly established. I tried doing all that you have suggested with no >>>>> luck. Attaching my KuduClient code. >>>>> >>>>> package org.dwh.streaming.kudu.sparkkudustreaming; >>>>> >>>>> import java.util.HashMap; >>>>> import java.util.Iterator; >>>>> import java.util.Map; >>>>> import org.apache.hadoop.util.ShutdownHookManager; >>>>> import org.apache.kudu.client.*; >>>>> import org.apache.spark.api.java.JavaRDD; >>>>> import org.slf4j.Logger; >>>>> import org.slf4j.LoggerFactory; >>>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN >>>>> ullConstants; >>>>> >>>>> public class KuduProcess { >>>>> private static Logger logger = LoggerFactory.getLogger(KuduPr >>>>> ocess.class); >>>>> private KuduTable table; >>>>> private KuduSession session; >>>>> >>>>> public static void upsertKudu(JavaRDD<Map<String, Object>> rdd, String >>>>> host, String tableName) { >>>>> rdd.foreachPartition(iterator -> { >>>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, >>>>> tableName, host); >>>>> int errorCount = errors.getRowErrors().length; >>>>> if(errorCount > 0){ >>>>> throw new RuntimeException("Failed to write " + errorCount + " >>>>> messages into Kudu"); >>>>> } >>>>> }); >>>>> } >>>>> private static RowErrorsAndOverflowStatus >>>>> upsertOpIterator(Iterator<Map<String, Object>> iter, String >>>>> tableName, String host) { >>>>> try { >>>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host); >>>>> KuduClient client = asyncClient.syncClient(); >>>>> table = client.openTable(tableName); >>>>> session = client.newSession(); >>>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU >>>>> SH_BACKGROUND); >>>>> while (iter.hasNext()) { >>>>> upsertOp(iter.next()); >>>>> } >>>>> } catch (KuduException e) { >>>>> logger.error("Exception in upsertOpIterator method", e); >>>>> } >>>>> finally{ >>>>> try { >>>>> session.close(); >>>>> } catch (KuduException e) { >>>>> logger.error("Exception in Connection close", e); >>>>> } >>>>> } >>>>> return session.getPendingErrors(); ---------------------> >>>>> Once, the connection is lost, this part of the code never gets called and >>>>> the Spark job will keep on running and processing the records while >>>>> the KuduClient is trying to connect to Kudu. Meanwhile, we are loosing all >>>>> the records. >>>>> } >>>>> public static void upsertOp(Map<String, Object> formattedMap) { >>>>> if (formattedMap.size() != 0) { >>>>> try { >>>>> Upsert upsert = table.newUpsert(); >>>>> PartialRow row = upsert.getRow(); >>>>> for (Map.Entry<String, Object> entry : formattedMap.entrySet()) { >>>>> if (entry.getValue().getClass().equals(String.class)) { >>>>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull)) >>>>> row.setNull(entry.getKey()); >>>>> else >>>>> row.addString(entry.getKey(), (String) entry.getValue()); >>>>> } else if (entry.getValue().getClass().equals(Long.class)) { >>>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull)) >>>>> row.setNull(entry.getKey()); >>>>> else >>>>> row.addLong(entry.getKey(), (Long) entry.getValue()); >>>>> } else if (entry.getValue().getClass().equals(Integer.class)) { >>>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull)) >>>>> row.setNull(entry.getKey()); >>>>> else >>>>> row.addInt(entry.getKey(), (Integer) entry.getValue()); >>>>> } >>>>> } >>>>> >>>>> session.apply(upsert); >>>>> } catch (Exception e) { >>>>> logger.error("Exception during upsert:", e); >>>>> } >>>>> } >>>>> } >>>>> } >>>>> class KuduConnection { >>>>> private static Logger logger = LoggerFactory.getLogger(KuduCo >>>>> nnection.class); >>>>> private static Map<String, AsyncKuduClient> asyncCache = new >>>>> HashMap<>(); >>>>> private static int ShutdownHookPriority = 100; >>>>> >>>>> static AsyncKuduClient getAsyncClient(String kuduMaster) { >>>>> if (!asyncCache.containsKey(kuduMaster)) { >>>>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien >>>>> tBuilder(kuduMaster).build(); >>>>> ShutdownHookManager.get().addShutdownHook(new Runnable() { >>>>> @Override >>>>> public void run() { >>>>> try { >>>>> asyncClient.close(); >>>>> } catch (Exception e) { >>>>> logger.error("Exception closing async client", e); >>>>> } >>>>> } >>>>> }, ShutdownHookPriority); >>>>> asyncCache.put(kuduMaster, asyncClient); >>>>> } >>>>> return asyncCache.get(kuduMaster); >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> Thanks, >>>>> Ravi >>>>> >>>>> On 5 March 2018 at 16:20, Mike Percy <mpe...@apache.org> wrote: >>>>> >>>>>> Hi Ravi, it would be helpful if you could attach what you are getting >>>>>> back from getPendingErrors() -- perhaps from dumping RowError.toString() >>>>>> from items in the returned array -- and indicate what you were hoping to >>>>>> get back. Note that a RowError can also return to you the Operation >>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation--> >>>>>> that you used to generate the write. From the Operation, you can get the >>>>>> original PartialRow >>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html> >>>>>> object, which should be able to identify the affected row that the write >>>>>> failed for. Does that help? >>>>>> >>>>>> Since you are using the Kudu client directly, Spark is not involved >>>>>> from the Kudu perspective, so you will need to deal with Spark on your >>>>>> own >>>>>> in that case. >>>>>> >>>>>> Mike >>>>>> >>>>>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth....@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Mike, >>>>>>> >>>>>>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND. >>>>>>> >>>>>>> So, I am trying to use Kudu Client API to perform UPSERT into Kudu >>>>>>> and I integrated this with Spark. I am trying to test a case where in if >>>>>>> any of Kudu server fails. So, in this case, if there is any problem in >>>>>>> writing, getPendingErrors() should give me a way to handle these errors >>>>>>> so >>>>>>> that I can successfully terminate my Spark Job. This is what I am >>>>>>> trying to >>>>>>> do. >>>>>>> >>>>>>> But, I am not able to get a hold of the exceptions being thrown from >>>>>>> with in the KuduClient when retrying to connect to Tablet Server. My >>>>>>> getPendingErrors is not getting ahold of these exceptions. >>>>>>> >>>>>>> Let me know if you need more clarification. I can post some Snippets. >>>>>>> >>>>>>> Thanks, >>>>>>> Ravi >>>>>>> >>>>>>> On 5 March 2018 at 13:18, Mike Percy <mpe...@apache.org> wrote: >>>>>>> >>>>>>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND >>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>? >>>>>>>> You mention that you are trying to use getPendingErrors() >>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors--> >>>>>>>> but >>>>>>>> it sounds like it's not working for you -- can you be more specific >>>>>>>> about >>>>>>>> what you expect and what you are observing? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Mike >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth < >>>>>>>> ravikanth....@gmail.com> wrote: >>>>>>>> >>>>>>>>> Thank Clifford. We are running Kudu 1.4 version. Till date we >>>>>>>>> didn't see any issues in production and we are not losing tablet >>>>>>>>> servers. >>>>>>>>> But, as part of testing I have to generate few unforeseen cases to >>>>>>>>> analyse >>>>>>>>> the application performance. One among that is bringing down the >>>>>>>>> tablet >>>>>>>>> server or master server intentionally during which I observed the >>>>>>>>> loss of >>>>>>>>> records. Just wanted to test cases out of the happy path here. Once >>>>>>>>> again >>>>>>>>> thanks for taking time to respond to me. >>>>>>>>> >>>>>>>>> - Ravi >>>>>>>>> >>>>>>>>> On 26 February 2018 at 19:58, Clifford Resnick < >>>>>>>>> cresn...@mediamath.com> wrote: >>>>>>>>> >>>>>>>>>> I'll have to get back to you on the code bits, but I'm pretty >>>>>>>>>> sure we're doing simple sync batching. We're not in production yet, >>>>>>>>>> but >>>>>>>>>> after some months of development I haven't seen any failures, even >>>>>>>>>> when >>>>>>>>>> pushing load doing multiple years' backfill. I think the real >>>>>>>>>> question is >>>>>>>>>> why are you losing tablet servers? The only instability we ever had >>>>>>>>>> with >>>>>>>>>> Kudu was when it had that weird ntp sync issue that was fixed I >>>>>>>>>> think for >>>>>>>>>> 1.6. What version are you running? >>>>>>>>>> >>>>>>>>>> Anyway I would think that infinite loop should be catchable >>>>>>>>>> somewhere. Our pipeline is set to fail/retry with Flink snapshots. I >>>>>>>>>> imagine there is similar with Spark. Sorry I cant be of more help! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Feb 26, 2018 9:10 PM, Ravi Kanth <ravikanth....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Cliff, >>>>>>>>>> >>>>>>>>>> Thanks for the response. Well, I do agree that its simple and >>>>>>>>>> seamless. In my case, I am able to upsert ~25000 events/sec into >>>>>>>>>> Kudu. But, >>>>>>>>>> I am facing the problem when any of the Kudu Tablet or master server >>>>>>>>>> is >>>>>>>>>> down. I am not able to get a hold of the exception from client. The >>>>>>>>>> client >>>>>>>>>> is going into an infinite loop trying to connect to Kudu. Meanwhile, >>>>>>>>>> I am >>>>>>>>>> loosing my records. I tried handling the errors through >>>>>>>>>> getPendingErrors() >>>>>>>>>> but still it is helpless. I am using AsyncKuduClient to establish the >>>>>>>>>> connection and retrieving the syncClient from the Async to open the >>>>>>>>>> session >>>>>>>>>> and table. Any help? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Ravi >>>>>>>>>> >>>>>>>>>> On 26 February 2018 at 18:00, Cliff Resnick <cre...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> While I can't speak for Spark, we do use the client API from >>>>>>>>>> Flink streaming and it's simple and seamless. It's especially nice >>>>>>>>>> if you >>>>>>>>>> require an Upsert semantic. >>>>>>>>>> >>>>>>>>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth" <ravikanth....@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Anyone using Spark Streaming to ingest data into Kudu and using >>>>>>>>>> Kudu Client API to do so rather than the traditional KuduContext >>>>>>>>>> API? I am >>>>>>>>>> stuck at a point and couldn't find a solution. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Ravi >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >