Mike- I actually got a hold of the pid's for the spark executors but facing issues to run the jstack. There are some VM exceptions. I will figure it out and will attach the jstack. Thanks for your patience.
On 6 March 2018 at 20:42, Mike Percy <mpe...@apache.org> wrote: > Hmm, could you try in spark local mode? i.e. https://jaceklaskowski.gi > tbooks.io/mastering-apache-spark/content/spark-local.html > > Mike > > On Tue, Mar 6, 2018 at 7:14 PM, Ravi Kanth <ravikanth....@gmail.com> > wrote: > >> Mike, >> >> Can you clarify a bit on grabbing the jstack for the process? I launched >> my Spark application and tried to get the pid using which I thought I can >> grab jstack trace during hang. Unfortunately, I am not able to figure out >> grabbing pid for Spark application. >> >> Thanks, >> Ravi >> >> On 6 March 2018 at 18:36, Mike Percy <mpe...@apache.org> wrote: >> >>> Thanks Ravi. Would you mind attaching the output of jstack on the >>> process during this hang? That would show what the Kudu client threads are >>> doing, as what we are seeing here is just the netty boss thread. >>> >>> Mike >>> >>> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth <ravikanth....@gmail.com> >>> wrote: >>> >>>> >>>> Yes, I have debugged to find the root cause. Every logger before "table >>>> = client.openTable(tableName);" is executing fine and exactly at the >>>> point of opening the table, it is throwing the below exception and nothing >>>> is being executed after that. Still the Spark batches are being processed >>>> and at opening the table is failing. I tried catching it with no luck. >>>> Please find below the exception. >>>> >>>> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer >>>> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream >>>> on [id: 0x6e13b01f] >>>> java.net.ConnectException: Connection refused: >>>> kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050 >>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >>>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl >>>> .java:717) >>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >>>> .nio.NioClientBoss.connect(NioClientBoss.java:152) >>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >>>> .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) >>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >>>> .nio.NioClientBoss.process(NioClientBoss.java:79) >>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >>>> .nio.AbstractNioSelector.run(AbstractNioSelector.java:337) >>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket >>>> .nio.NioClientBoss.run(NioClientBoss.java:42) >>>> at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen >>>> amingRunnable.run(ThreadRenamingRunnable.java:108) >>>> at org.apache.kudu.client.shaded.org.jboss.netty.util.internal. >>>> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) >>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool >>>> Executor.java:1142) >>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo >>>> lExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> >>>> Thanks, >>>> Ravi >>>> >>>> On 5 March 2018 at 23:52, Mike Percy <mpe...@apache.org> wrote: >>>> >>>>> Have you considered checking your session error count or pending >>>>> errors in your while loop every so often? Can you identify where your code >>>>> is hanging when the connection is lost (what line)? >>>>> >>>>> Mike >>>>> >>>>> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth <ravikanth....@gmail.com> >>>>> wrote: >>>>> >>>>>> In addition to my previous comment, I raised a support ticket for >>>>>> this issue with Cloudera and one of the support person mentioned below, >>>>>> >>>>>> *"Thank you for clarifying, The exceptions are logged but not >>>>>> re-thrown to an upper layer, so that explains why the Spark application >>>>>> is >>>>>> not aware of the underlying error."* >>>>>> >>>>>> On 5 March 2018 at 21:02, Ravi Kanth <ravikanth....@gmail.com> wrote: >>>>>> >>>>>>> Mike, >>>>>>> >>>>>>> Thanks for the information. But, once the connection to any of the >>>>>>> Kudu servers is lost then there is no way I can have a control on the >>>>>>> KuduSession object and so with getPendingErrors(). The KuduClient in >>>>>>> this >>>>>>> case is becoming a zombie and never returned back till the connection is >>>>>>> properly established. I tried doing all that you have suggested with no >>>>>>> luck. Attaching my KuduClient code. >>>>>>> >>>>>>> package org.dwh.streaming.kudu.sparkkudustreaming; >>>>>>> >>>>>>> import java.util.HashMap; >>>>>>> import java.util.Iterator; >>>>>>> import java.util.Map; >>>>>>> import org.apache.hadoop.util.ShutdownHookManager; >>>>>>> import org.apache.kudu.client.*; >>>>>>> import org.apache.spark.api.java.JavaRDD; >>>>>>> import org.slf4j.Logger; >>>>>>> import org.slf4j.LoggerFactory; >>>>>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN >>>>>>> ullConstants; >>>>>>> >>>>>>> public class KuduProcess { >>>>>>> private static Logger logger = LoggerFactory.getLogger(KuduPr >>>>>>> ocess.class); >>>>>>> private KuduTable table; >>>>>>> private KuduSession session; >>>>>>> >>>>>>> public static void upsertKudu(JavaRDD<Map<String, Object>> rdd, >>>>>>> String host, String tableName) { >>>>>>> rdd.foreachPartition(iterator -> { >>>>>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, >>>>>>> tableName, host); >>>>>>> int errorCount = errors.getRowErrors().length; >>>>>>> if(errorCount > 0){ >>>>>>> throw new RuntimeException("Failed to write " + errorCount + " >>>>>>> messages into Kudu"); >>>>>>> } >>>>>>> }); >>>>>>> } >>>>>>> private static RowErrorsAndOverflowStatus >>>>>>> upsertOpIterator(Iterator<Map<String, Object>> iter, String >>>>>>> tableName, String host) { >>>>>>> try { >>>>>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host); >>>>>>> KuduClient client = asyncClient.syncClient(); >>>>>>> table = client.openTable(tableName); >>>>>>> session = client.newSession(); >>>>>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU >>>>>>> SH_BACKGROUND); >>>>>>> while (iter.hasNext()) { >>>>>>> upsertOp(iter.next()); >>>>>>> } >>>>>>> } catch (KuduException e) { >>>>>>> logger.error("Exception in upsertOpIterator method", e); >>>>>>> } >>>>>>> finally{ >>>>>>> try { >>>>>>> session.close(); >>>>>>> } catch (KuduException e) { >>>>>>> logger.error("Exception in Connection close", e); >>>>>>> } >>>>>>> } >>>>>>> return session.getPendingErrors(); ---------------------> >>>>>>> Once, the connection is lost, this part of the code never gets called >>>>>>> and >>>>>>> the Spark job will keep on running and processing the records while >>>>>>> the KuduClient is trying to connect to Kudu. Meanwhile, we are loosing >>>>>>> all >>>>>>> the records. >>>>>>> } >>>>>>> public static void upsertOp(Map<String, Object> formattedMap) { >>>>>>> if (formattedMap.size() != 0) { >>>>>>> try { >>>>>>> Upsert upsert = table.newUpsert(); >>>>>>> PartialRow row = upsert.getRow(); >>>>>>> for (Map.Entry<String, Object> entry : formattedMap.entrySet()) { >>>>>>> if (entry.getValue().getClass().equals(String.class)) { >>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull)) >>>>>>> row.setNull(entry.getKey()); >>>>>>> else >>>>>>> row.addString(entry.getKey(), (String) entry.getValue()); >>>>>>> } else if (entry.getValue().getClass().equals(Long.class)) { >>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull)) >>>>>>> row.setNull(entry.getKey()); >>>>>>> else >>>>>>> row.addLong(entry.getKey(), (Long) entry.getValue()); >>>>>>> } else if (entry.getValue().getClass().equals(Integer.class)) { >>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull)) >>>>>>> row.setNull(entry.getKey()); >>>>>>> else >>>>>>> row.addInt(entry.getKey(), (Integer) entry.getValue()); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> session.apply(upsert); >>>>>>> } catch (Exception e) { >>>>>>> logger.error("Exception during upsert:", e); >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> } >>>>>>> class KuduConnection { >>>>>>> private static Logger logger = LoggerFactory.getLogger(KuduCo >>>>>>> nnection.class); >>>>>>> private static Map<String, AsyncKuduClient> asyncCache = new >>>>>>> HashMap<>(); >>>>>>> private static int ShutdownHookPriority = 100; >>>>>>> >>>>>>> static AsyncKuduClient getAsyncClient(String kuduMaster) { >>>>>>> if (!asyncCache.containsKey(kuduMaster)) { >>>>>>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien >>>>>>> tBuilder(kuduMaster).build(); >>>>>>> ShutdownHookManager.get().addShutdownHook(new Runnable() { >>>>>>> @Override >>>>>>> public void run() { >>>>>>> try { >>>>>>> asyncClient.close(); >>>>>>> } catch (Exception e) { >>>>>>> logger.error("Exception closing async client", e); >>>>>>> } >>>>>>> } >>>>>>> }, ShutdownHookPriority); >>>>>>> asyncCache.put(kuduMaster, asyncClient); >>>>>>> } >>>>>>> return asyncCache.get(kuduMaster); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> Thanks, >>>>>>> Ravi >>>>>>> >>>>>>> On 5 March 2018 at 16:20, Mike Percy <mpe...@apache.org> wrote: >>>>>>> >>>>>>>> Hi Ravi, it would be helpful if you could attach what you are >>>>>>>> getting back from getPendingErrors() -- perhaps from dumping >>>>>>>> RowError.toString() from items in the returned array -- and indicate >>>>>>>> what >>>>>>>> you were hoping to get back. Note that a RowError can also return to >>>>>>>> you >>>>>>>> the Operation >>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation--> >>>>>>>> that you used to generate the write. From the Operation, you can get >>>>>>>> the >>>>>>>> original PartialRow >>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html> >>>>>>>> object, which should be able to identify the affected row that the >>>>>>>> write >>>>>>>> failed for. Does that help? >>>>>>>> >>>>>>>> Since you are using the Kudu client directly, Spark is not involved >>>>>>>> from the Kudu perspective, so you will need to deal with Spark on your >>>>>>>> own >>>>>>>> in that case. >>>>>>>> >>>>>>>> Mike >>>>>>>> >>>>>>>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth....@gmail.com >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> Hi Mike, >>>>>>>>> >>>>>>>>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND. >>>>>>>>> >>>>>>>>> So, I am trying to use Kudu Client API to perform UPSERT into Kudu >>>>>>>>> and I integrated this with Spark. I am trying to test a case where in >>>>>>>>> if >>>>>>>>> any of Kudu server fails. So, in this case, if there is any problem in >>>>>>>>> writing, getPendingErrors() should give me a way to handle these >>>>>>>>> errors so >>>>>>>>> that I can successfully terminate my Spark Job. This is what I am >>>>>>>>> trying to >>>>>>>>> do. >>>>>>>>> >>>>>>>>> But, I am not able to get a hold of the exceptions being thrown >>>>>>>>> from with in the KuduClient when retrying to connect to Tablet >>>>>>>>> Server. My >>>>>>>>> getPendingErrors is not getting ahold of these exceptions. >>>>>>>>> >>>>>>>>> Let me know if you need more clarification. I can post some >>>>>>>>> Snippets. >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> Ravi >>>>>>>>> >>>>>>>>> On 5 March 2018 at 13:18, Mike Percy <mpe...@apache.org> wrote: >>>>>>>>> >>>>>>>>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND >>>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>? >>>>>>>>>> You mention that you are trying to use getPendingErrors() >>>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors--> >>>>>>>>>> but >>>>>>>>>> it sounds like it's not working for you -- can you be more specific >>>>>>>>>> about >>>>>>>>>> what you expect and what you are observing? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Mike >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth < >>>>>>>>>> ravikanth....@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Thank Clifford. We are running Kudu 1.4 version. Till date we >>>>>>>>>>> didn't see any issues in production and we are not losing tablet >>>>>>>>>>> servers. >>>>>>>>>>> But, as part of testing I have to generate few unforeseen cases to >>>>>>>>>>> analyse >>>>>>>>>>> the application performance. One among that is bringing down the >>>>>>>>>>> tablet >>>>>>>>>>> server or master server intentionally during which I observed the >>>>>>>>>>> loss of >>>>>>>>>>> records. Just wanted to test cases out of the happy path here. Once >>>>>>>>>>> again >>>>>>>>>>> thanks for taking time to respond to me. >>>>>>>>>>> >>>>>>>>>>> - Ravi >>>>>>>>>>> >>>>>>>>>>> On 26 February 2018 at 19:58, Clifford Resnick < >>>>>>>>>>> cresn...@mediamath.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> I'll have to get back to you on the code bits, but I'm pretty >>>>>>>>>>>> sure we're doing simple sync batching. We're not in production >>>>>>>>>>>> yet, but >>>>>>>>>>>> after some months of development I haven't seen any failures, even >>>>>>>>>>>> when >>>>>>>>>>>> pushing load doing multiple years' backfill. I think the real >>>>>>>>>>>> question is >>>>>>>>>>>> why are you losing tablet servers? The only instability we ever >>>>>>>>>>>> had with >>>>>>>>>>>> Kudu was when it had that weird ntp sync issue that was fixed I >>>>>>>>>>>> think for >>>>>>>>>>>> 1.6. What version are you running? >>>>>>>>>>>> >>>>>>>>>>>> Anyway I would think that infinite loop should be catchable >>>>>>>>>>>> somewhere. Our pipeline is set to fail/retry with Flink snapshots. >>>>>>>>>>>> I >>>>>>>>>>>> imagine there is similar with Spark. Sorry I cant be of more help! >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Feb 26, 2018 9:10 PM, Ravi Kanth <ravikanth....@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Cliff, >>>>>>>>>>>> >>>>>>>>>>>> Thanks for the response. Well, I do agree that its simple and >>>>>>>>>>>> seamless. In my case, I am able to upsert ~25000 events/sec into >>>>>>>>>>>> Kudu. But, >>>>>>>>>>>> I am facing the problem when any of the Kudu Tablet or master >>>>>>>>>>>> server is >>>>>>>>>>>> down. I am not able to get a hold of the exception from client. >>>>>>>>>>>> The client >>>>>>>>>>>> is going into an infinite loop trying to connect to Kudu. >>>>>>>>>>>> Meanwhile, I am >>>>>>>>>>>> loosing my records. I tried handling the errors through >>>>>>>>>>>> getPendingErrors() >>>>>>>>>>>> but still it is helpless. I am using AsyncKuduClient to establish >>>>>>>>>>>> the >>>>>>>>>>>> connection and retrieving the syncClient from the Async to open >>>>>>>>>>>> the session >>>>>>>>>>>> and table. Any help? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Ravi >>>>>>>>>>>> >>>>>>>>>>>> On 26 February 2018 at 18:00, Cliff Resnick <cre...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> While I can't speak for Spark, we do use the client API from >>>>>>>>>>>> Flink streaming and it's simple and seamless. It's especially nice >>>>>>>>>>>> if you >>>>>>>>>>>> require an Upsert semantic. >>>>>>>>>>>> >>>>>>>>>>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth" <ravikanth....@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> Anyone using Spark Streaming to ingest data into Kudu and using >>>>>>>>>>>> Kudu Client API to do so rather than the traditional KuduContext >>>>>>>>>>>> API? I am >>>>>>>>>>>> stuck at a point and couldn't find a solution. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Ravi >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >