Mike- I actually got a hold of the pid's for the spark executors but facing
issues to run the jstack. There are some VM exceptions. I will figure it
out and will attach the jstack. Thanks for your patience.

On 6 March 2018 at 20:42, Mike Percy <mpe...@apache.org> wrote:

> Hmm, could you try in spark local mode? i.e. https://jaceklaskowski.gi
> tbooks.io/mastering-apache-spark/content/spark-local.html
>
> Mike
>
> On Tue, Mar 6, 2018 at 7:14 PM, Ravi Kanth <ravikanth....@gmail.com>
> wrote:
>
>> Mike,
>>
>> Can you clarify a bit on grabbing the jstack for the process? I launched
>> my Spark application and tried to get the pid using which I thought I can
>> grab jstack trace during hang. Unfortunately, I am not able to figure out
>> grabbing pid for Spark application.
>>
>> Thanks,
>> Ravi
>>
>> On 6 March 2018 at 18:36, Mike Percy <mpe...@apache.org> wrote:
>>
>>> Thanks Ravi. Would you mind attaching the output of jstack on the
>>> process during this hang? That would show what the Kudu client threads are
>>> doing, as what we are seeing here is just the netty boss thread.
>>>
>>> Mike
>>>
>>> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth <ravikanth....@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Yes, I have debugged to find the root cause. Every logger before "table
>>>> = client.openTable(tableName);" is executing fine and exactly at the
>>>> point of opening the table, it is throwing the below exception and nothing
>>>> is being executed after that. Still the Spark batches are being processed
>>>> and at opening the table is failing. I tried catching it with no luck.
>>>> Please find below the exception.
>>>>
>>>> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
>>>> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream
>>>> on [id: 0x6e13b01f]
>>>> java.net.ConnectException: Connection refused:
>>>> kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
>>>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>>>> .java:717)
>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>>> .nio.NioClientBoss.connect(NioClientBoss.java:152)
>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>>> .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>>> .nio.NioClientBoss.process(NioClientBoss.java:79)
>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>>> .nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>>> .nio.NioClientBoss.run(NioClientBoss.java:42)
>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen
>>>> amingRunnable.run(ThreadRenamingRunnable.java:108)
>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
>>>> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>> Executor.java:1142)
>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>> lExecutor.java:617)
>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>> On 5 March 2018 at 23:52, Mike Percy <mpe...@apache.org> wrote:
>>>>
>>>>> Have you considered checking your session error count or pending
>>>>> errors in your while loop every so often? Can you identify where your code
>>>>> is hanging when the connection is lost (what line)?
>>>>>
>>>>> Mike
>>>>>
>>>>> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth <ravikanth....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> In addition to my previous comment, I raised a support ticket for
>>>>>> this issue with Cloudera and one of the support person mentioned below,
>>>>>>
>>>>>> *"Thank you for clarifying, The exceptions are logged but not
>>>>>> re-thrown to an upper layer, so that explains why the Spark application 
>>>>>> is
>>>>>> not aware of the underlying error."*
>>>>>>
>>>>>> On 5 March 2018 at 21:02, Ravi Kanth <ravikanth....@gmail.com> wrote:
>>>>>>
>>>>>>> Mike,
>>>>>>>
>>>>>>> Thanks for the information. But, once the connection to any of the
>>>>>>> Kudu servers is lost then there is no way I can have a control on the
>>>>>>> KuduSession object and so with getPendingErrors(). The KuduClient in 
>>>>>>> this
>>>>>>> case is becoming a zombie and never returned back till the connection is
>>>>>>> properly established. I tried doing all that you have suggested with no
>>>>>>> luck. Attaching my KuduClient code.
>>>>>>>
>>>>>>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>>>>>>
>>>>>>> import java.util.HashMap;
>>>>>>> import java.util.Iterator;
>>>>>>> import java.util.Map;
>>>>>>> import org.apache.hadoop.util.ShutdownHookManager;
>>>>>>> import org.apache.kudu.client.*;
>>>>>>> import org.apache.spark.api.java.JavaRDD;
>>>>>>> import org.slf4j.Logger;
>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>>>>>>> ullConstants;
>>>>>>>
>>>>>>> public class KuduProcess {
>>>>>>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>>>>>>> ocess.class);
>>>>>>> private KuduTable table;
>>>>>>> private KuduSession session;
>>>>>>>
>>>>>>> public static void upsertKudu(JavaRDD<Map<String, Object>> rdd,
>>>>>>> String host, String tableName) {
>>>>>>> rdd.foreachPartition(iterator -> {
>>>>>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
>>>>>>> tableName, host);
>>>>>>> int errorCount = errors.getRowErrors().length;
>>>>>>> if(errorCount > 0){
>>>>>>> throw new RuntimeException("Failed to write " + errorCount + "
>>>>>>> messages into Kudu");
>>>>>>> }
>>>>>>> });
>>>>>>> }
>>>>>>> private static RowErrorsAndOverflowStatus
>>>>>>> upsertOpIterator(Iterator<Map<String, Object>> iter, String
>>>>>>> tableName, String host) {
>>>>>>> try {
>>>>>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
>>>>>>> KuduClient client = asyncClient.syncClient();
>>>>>>> table = client.openTable(tableName);
>>>>>>> session = client.newSession();
>>>>>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>>>>>>> SH_BACKGROUND);
>>>>>>> while (iter.hasNext()) {
>>>>>>> upsertOp(iter.next());
>>>>>>> }
>>>>>>> } catch (KuduException e) {
>>>>>>> logger.error("Exception in upsertOpIterator method", e);
>>>>>>> }
>>>>>>> finally{
>>>>>>> try {
>>>>>>> session.close();
>>>>>>> } catch (KuduException e) {
>>>>>>> logger.error("Exception in Connection close", e);
>>>>>>> }
>>>>>>> }
>>>>>>> return session.getPendingErrors();        --------------------->
>>>>>>> Once, the connection is lost, this part of the code never gets called 
>>>>>>> and
>>>>>>> the Spark job will keep on running and processing the records while
>>>>>>> the KuduClient is trying to connect to Kudu. Meanwhile, we are loosing 
>>>>>>> all
>>>>>>> the records.
>>>>>>> }
>>>>>>> public static void upsertOp(Map<String, Object> formattedMap) {
>>>>>>> if (formattedMap.size() != 0) {
>>>>>>> try {
>>>>>>> Upsert upsert = table.newUpsert();
>>>>>>> PartialRow row = upsert.getRow();
>>>>>>> for (Map.Entry<String, Object> entry : formattedMap.entrySet()) {
>>>>>>> if (entry.getValue().getClass().equals(String.class)) {
>>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
>>>>>>> row.setNull(entry.getKey());
>>>>>>> else
>>>>>>> row.addString(entry.getKey(), (String) entry.getValue());
>>>>>>> } else if (entry.getValue().getClass().equals(Long.class)) {
>>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
>>>>>>> row.setNull(entry.getKey());
>>>>>>> else
>>>>>>> row.addLong(entry.getKey(), (Long) entry.getValue());
>>>>>>> } else if (entry.getValue().getClass().equals(Integer.class)) {
>>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
>>>>>>> row.setNull(entry.getKey());
>>>>>>> else
>>>>>>> row.addInt(entry.getKey(), (Integer) entry.getValue());
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> session.apply(upsert);
>>>>>>> } catch (Exception e) {
>>>>>>> logger.error("Exception during upsert:", e);
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>> class KuduConnection {
>>>>>>> private static Logger logger = LoggerFactory.getLogger(KuduCo
>>>>>>> nnection.class);
>>>>>>> private static Map<String, AsyncKuduClient> asyncCache = new
>>>>>>> HashMap<>();
>>>>>>> private static int ShutdownHookPriority = 100;
>>>>>>>
>>>>>>> static AsyncKuduClient getAsyncClient(String kuduMaster) {
>>>>>>> if (!asyncCache.containsKey(kuduMaster)) {
>>>>>>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien
>>>>>>> tBuilder(kuduMaster).build();
>>>>>>> ShutdownHookManager.get().addShutdownHook(new Runnable() {
>>>>>>> @Override
>>>>>>> public void run() {
>>>>>>> try {
>>>>>>> asyncClient.close();
>>>>>>> } catch (Exception e) {
>>>>>>> logger.error("Exception closing async client", e);
>>>>>>> }
>>>>>>> }
>>>>>>> }, ShutdownHookPriority);
>>>>>>> asyncCache.put(kuduMaster, asyncClient);
>>>>>>> }
>>>>>>> return asyncCache.get(kuduMaster);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Ravi
>>>>>>>
>>>>>>> On 5 March 2018 at 16:20, Mike Percy <mpe...@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi Ravi, it would be helpful if you could attach what you are
>>>>>>>> getting back from getPendingErrors() -- perhaps from dumping
>>>>>>>> RowError.toString() from items in the returned array -- and indicate 
>>>>>>>> what
>>>>>>>> you were hoping to get back. Note that a RowError can also return to 
>>>>>>>> you
>>>>>>>> the Operation
>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
>>>>>>>> that you used to generate the write. From the Operation, you can get 
>>>>>>>> the
>>>>>>>> original PartialRow
>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
>>>>>>>> object, which should be able to identify the affected row that the 
>>>>>>>> write
>>>>>>>> failed for. Does that help?
>>>>>>>>
>>>>>>>> Since you are using the Kudu client directly, Spark is not involved
>>>>>>>> from the Kudu perspective, so you will need to deal with Spark on your 
>>>>>>>> own
>>>>>>>> in that case.
>>>>>>>>
>>>>>>>> Mike
>>>>>>>>
>>>>>>>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth....@gmail.com
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>> Hi Mike,
>>>>>>>>>
>>>>>>>>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>>>>>>>>>
>>>>>>>>> So, I am trying to use Kudu Client API to perform UPSERT into Kudu
>>>>>>>>> and I integrated this with Spark. I am trying to test a case where in 
>>>>>>>>> if
>>>>>>>>> any of Kudu server fails. So, in this case, if there is any problem in
>>>>>>>>> writing, getPendingErrors() should give me a way to handle these 
>>>>>>>>> errors so
>>>>>>>>> that I can successfully terminate my Spark Job. This is what I am 
>>>>>>>>> trying to
>>>>>>>>> do.
>>>>>>>>>
>>>>>>>>> But, I am not able to get a hold of the exceptions being thrown
>>>>>>>>> from with in the KuduClient when retrying to connect to Tablet 
>>>>>>>>> Server. My
>>>>>>>>> getPendingErrors is not getting ahold of these exceptions.
>>>>>>>>>
>>>>>>>>> Let me know if you need more clarification. I can post some
>>>>>>>>> Snippets.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Ravi
>>>>>>>>>
>>>>>>>>> On 5 March 2018 at 13:18, Mike Percy <mpe...@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
>>>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>?
>>>>>>>>>> You mention that you are trying to use getPendingErrors()
>>>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors-->
>>>>>>>>>>  but
>>>>>>>>>> it sounds like it's not working for you -- can you be more specific 
>>>>>>>>>> about
>>>>>>>>>> what you expect and what you are observing?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Mike
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth <
>>>>>>>>>> ravikanth....@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thank Clifford. We are running Kudu 1.4 version. Till date we
>>>>>>>>>>> didn't see any issues in production and we are not losing tablet 
>>>>>>>>>>> servers.
>>>>>>>>>>> But, as part of testing I have to generate few unforeseen cases to 
>>>>>>>>>>> analyse
>>>>>>>>>>> the application performance. One among that is bringing down the 
>>>>>>>>>>> tablet
>>>>>>>>>>> server or master server intentionally during which I observed the 
>>>>>>>>>>> loss of
>>>>>>>>>>> records. Just wanted to test cases out of the happy path here. Once 
>>>>>>>>>>> again
>>>>>>>>>>> thanks for taking time to respond to me.
>>>>>>>>>>>
>>>>>>>>>>> - Ravi
>>>>>>>>>>>
>>>>>>>>>>> On 26 February 2018 at 19:58, Clifford Resnick <
>>>>>>>>>>> cresn...@mediamath.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> I'll have to get back to you on the code bits, but I'm pretty
>>>>>>>>>>>> sure we're doing simple sync batching. We're not in production 
>>>>>>>>>>>> yet, but
>>>>>>>>>>>> after some months of development I haven't seen any failures, even 
>>>>>>>>>>>> when
>>>>>>>>>>>> pushing load doing multiple years' backfill. I think the real 
>>>>>>>>>>>> question is
>>>>>>>>>>>> why are you losing tablet servers? The only instability we ever 
>>>>>>>>>>>> had with
>>>>>>>>>>>> Kudu was when it had that weird ntp sync issue that was fixed I 
>>>>>>>>>>>> think for
>>>>>>>>>>>> 1.6. What version are you running?
>>>>>>>>>>>>
>>>>>>>>>>>> Anyway I would think that infinite loop should be catchable
>>>>>>>>>>>> somewhere. Our pipeline is set to fail/retry with Flink snapshots. 
>>>>>>>>>>>> I
>>>>>>>>>>>> imagine there is similar with Spark. Sorry I cant be of more help!
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Feb 26, 2018 9:10 PM, Ravi Kanth <ravikanth....@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Cliff,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the response. Well, I do agree that its simple and
>>>>>>>>>>>> seamless. In my case, I am able to upsert ~25000 events/sec into 
>>>>>>>>>>>> Kudu. But,
>>>>>>>>>>>> I am facing the problem when any of the Kudu Tablet or master 
>>>>>>>>>>>> server is
>>>>>>>>>>>> down. I am not able to get a hold of the exception from client. 
>>>>>>>>>>>> The client
>>>>>>>>>>>> is going into an infinite loop trying to connect to Kudu. 
>>>>>>>>>>>> Meanwhile, I am
>>>>>>>>>>>> loosing my records. I tried handling the errors through 
>>>>>>>>>>>> getPendingErrors()
>>>>>>>>>>>> but still it is helpless. I am using AsyncKuduClient to establish 
>>>>>>>>>>>> the
>>>>>>>>>>>> connection and retrieving the syncClient from the Async to open 
>>>>>>>>>>>> the session
>>>>>>>>>>>> and table. Any help?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Ravi
>>>>>>>>>>>>
>>>>>>>>>>>> On 26 February 2018 at 18:00, Cliff Resnick <cre...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> While I can't speak for Spark, we do use the client API from
>>>>>>>>>>>> Flink streaming and it's simple and seamless. It's especially nice 
>>>>>>>>>>>> if you
>>>>>>>>>>>> require an Upsert semantic.
>>>>>>>>>>>>
>>>>>>>>>>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth" <ravikanth....@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Anyone using Spark Streaming to ingest data into Kudu and using
>>>>>>>>>>>> Kudu Client API to do so rather than the traditional KuduContext 
>>>>>>>>>>>> API? I am
>>>>>>>>>>>> stuck at a point and couldn't find a solution.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Ravi
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to