Re: Spark Streaming + Kudu

2018-03-06 Thread Ravi Kanth
Mike- I actually got a hold of the pid's for the spark executors but facing
issues to run the jstack. There are some VM exceptions. I will figure it
out and will attach the jstack. Thanks for your patience.

On 6 March 2018 at 20:42, Mike Percy  wrote:

> Hmm, could you try in spark local mode? i.e. https://jaceklaskowski.gi
> tbooks.io/mastering-apache-spark/content/spark-local.html
>
> Mike
>
> On Tue, Mar 6, 2018 at 7:14 PM, Ravi Kanth 
> wrote:
>
>> Mike,
>>
>> Can you clarify a bit on grabbing the jstack for the process? I launched
>> my Spark application and tried to get the pid using which I thought I can
>> grab jstack trace during hang. Unfortunately, I am not able to figure out
>> grabbing pid for Spark application.
>>
>> Thanks,
>> Ravi
>>
>> On 6 March 2018 at 18:36, Mike Percy  wrote:
>>
>>> Thanks Ravi. Would you mind attaching the output of jstack on the
>>> process during this hang? That would show what the Kudu client threads are
>>> doing, as what we are seeing here is just the netty boss thread.
>>>
>>> Mike
>>>
>>> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth 
>>> wrote:
>>>

 Yes, I have debugged to find the root cause. Every logger before "table
 = client.openTable(tableName);" is executing fine and exactly at the
 point of opening the table, it is throwing the below exception and nothing
 is being executed after that. Still the Spark batches are being processed
 and at opening the table is failing. I tried catching it with no luck.
 Please find below the exception.

 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
 bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream
 on [id: 0x6e13b01f]
 java.net.ConnectException: Connection refused:
 kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
 .java:717)
 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
 .nio.NioClientBoss.connect(NioClientBoss.java:152)
 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
 .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
 .nio.NioClientBoss.process(NioClientBoss.java:79)
 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
 .nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
 .nio.NioClientBoss.run(NioClientBoss.java:42)
 at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen
 amingRunnable.run(ThreadRenamingRunnable.java:108)
 at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
 DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
 Executor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
 lExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)


 Thanks,
 Ravi

 On 5 March 2018 at 23:52, Mike Percy  wrote:

> Have you considered checking your session error count or pending
> errors in your while loop every so often? Can you identify where your code
> is hanging when the connection is lost (what line)?
>
> Mike
>
> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth 
> wrote:
>
>> In addition to my previous comment, I raised a support ticket for
>> this issue with Cloudera and one of the support person mentioned below,
>>
>> *"Thank you for clarifying, The exceptions are logged but not
>> re-thrown to an upper layer, so that explains why the Spark application 
>> is
>> not aware of the underlying error."*
>>
>> On 5 March 2018 at 21:02, Ravi Kanth  wrote:
>>
>>> Mike,
>>>
>>> Thanks for the information. But, once the connection to any of the
>>> Kudu servers is lost then there is no way I can have a control on the
>>> KuduSession object and so with getPendingErrors(). The KuduClient in 
>>> this
>>> case is becoming a zombie and never returned back till the connection is
>>> properly established. I tried doing all that you have suggested with no
>>> luck. Attaching my KuduClient code.
>>>
>>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>>
>>> import java.util.HashMap;
>>> import java.util.Iterator;
>>> import java.util.Map;
>>> import org.apache.hadoop.util.ShutdownHookManager;
>>> import org.apache.kudu.client.*;
>>> import org.apache.spark.api.java.JavaRDD;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>>> ullConstants;
>>>
>>> public class KuduProcess {
>>> private static Logger l

Re: Spark Streaming + Kudu

2018-03-06 Thread Mike Percy
Hmm, could you try in spark local mode? i.e. https://jaceklaskowski.
gitbooks.io/mastering-apache-spark/content/spark-local.html

Mike

On Tue, Mar 6, 2018 at 7:14 PM, Ravi Kanth  wrote:

> Mike,
>
> Can you clarify a bit on grabbing the jstack for the process? I launched
> my Spark application and tried to get the pid using which I thought I can
> grab jstack trace during hang. Unfortunately, I am not able to figure out
> grabbing pid for Spark application.
>
> Thanks,
> Ravi
>
> On 6 March 2018 at 18:36, Mike Percy  wrote:
>
>> Thanks Ravi. Would you mind attaching the output of jstack on the process
>> during this hang? That would show what the Kudu client threads are doing,
>> as what we are seeing here is just the netty boss thread.
>>
>> Mike
>>
>> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth 
>> wrote:
>>
>>>
>>> Yes, I have debugged to find the root cause. Every logger before "table
>>> = client.openTable(tableName);" is executing fine and exactly at the
>>> point of opening the table, it is throwing the below exception and nothing
>>> is being executed after that. Still the Spark batches are being processed
>>> and at opening the table is failing. I tried catching it with no luck.
>>> Please find below the exception.
>>>
>>> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
>>> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream
>>> on [id: 0x6e13b01f]
>>> java.net.ConnectException: Connection refused:
>>> kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>>> .java:717)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.connect(NioClientBoss.java:152)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.process(NioClientBoss.java:79)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.run(NioClientBoss.java:42)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen
>>> amingRunnable.run(ThreadRenamingRunnable.java:108)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
>>> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 5 March 2018 at 23:52, Mike Percy  wrote:
>>>
 Have you considered checking your session error count or pending errors
 in your while loop every so often? Can you identify where your code is
 hanging when the connection is lost (what line)?

 Mike

 On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth 
 wrote:

> In addition to my previous comment, I raised a support ticket for this
> issue with Cloudera and one of the support person mentioned below,
>
> *"Thank you for clarifying, The exceptions are logged but not
> re-thrown to an upper layer, so that explains why the Spark application is
> not aware of the underlying error."*
>
> On 5 March 2018 at 21:02, Ravi Kanth  wrote:
>
>> Mike,
>>
>> Thanks for the information. But, once the connection to any of the
>> Kudu servers is lost then there is no way I can have a control on the
>> KuduSession object and so with getPendingErrors(). The KuduClient in this
>> case is becoming a zombie and never returned back till the connection is
>> properly established. I tried doing all that you have suggested with no
>> luck. Attaching my KuduClient code.
>>
>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>
>> import java.util.HashMap;
>> import java.util.Iterator;
>> import java.util.Map;
>> import org.apache.hadoop.util.ShutdownHookManager;
>> import org.apache.kudu.client.*;
>> import org.apache.spark.api.java.JavaRDD;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>> ullConstants;
>>
>> public class KuduProcess {
>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>> ocess.class);
>> private KuduTable table;
>> private KuduSession session;
>>
>> public static void upsertKudu(JavaRDD> rdd,
>> String host, String tableName) {
>> rdd.foreachPartition(iterator -> {
>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
>> tableName, host);
>> int errorCount = er

Re: Spark Streaming + Kudu

2018-03-06 Thread Ravi Kanth
Mike,

Can you clarify a bit on grabbing the jstack for the process? I launched my
Spark application and tried to get the pid using which I thought I can grab
jstack trace during hang. Unfortunately, I am not able to figure out
grabbing pid for Spark application.

Thanks,
Ravi

On 6 March 2018 at 18:36, Mike Percy  wrote:

> Thanks Ravi. Would you mind attaching the output of jstack on the process
> during this hang? That would show what the Kudu client threads are doing,
> as what we are seeing here is just the netty boss thread.
>
> Mike
>
> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth 
> wrote:
>
>>
>> Yes, I have debugged to find the root cause. Every logger before "table
>> = client.openTable(tableName);" is executing fine and exactly at the
>> point of opening the table, it is throwing the below exception and nothing
>> is being executed after that. Still the Spark batches are being processed
>> and at opening the table is failing. I tried catching it with no luck.
>> Please find below the exception.
>>
>> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
>> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream
>> on [id: 0x6e13b01f]
>> java.net.ConnectException: Connection refused:
>> kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>> .java:717)
>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.connect(NioClientBoss.java:152)
>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.process(NioClientBoss.java:79)
>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.run(NioClientBoss.java:42)
>> at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen
>> amingRunnable.run(ThreadRenamingRunnable.java:108)
>> at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
>> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> Thanks,
>> Ravi
>>
>> On 5 March 2018 at 23:52, Mike Percy  wrote:
>>
>>> Have you considered checking your session error count or pending errors
>>> in your while loop every so often? Can you identify where your code is
>>> hanging when the connection is lost (what line)?
>>>
>>> Mike
>>>
>>> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth 
>>> wrote:
>>>
 In addition to my previous comment, I raised a support ticket for this
 issue with Cloudera and one of the support person mentioned below,

 *"Thank you for clarifying, The exceptions are logged but not re-thrown
 to an upper layer, so that explains why the Spark application is not aware
 of the underlying error."*

 On 5 March 2018 at 21:02, Ravi Kanth  wrote:

> Mike,
>
> Thanks for the information. But, once the connection to any of the
> Kudu servers is lost then there is no way I can have a control on the
> KuduSession object and so with getPendingErrors(). The KuduClient in this
> case is becoming a zombie and never returned back till the connection is
> properly established. I tried doing all that you have suggested with no
> luck. Attaching my KuduClient code.
>
> package org.dwh.streaming.kudu.sparkkudustreaming;
>
> import java.util.HashMap;
> import java.util.Iterator;
> import java.util.Map;
> import org.apache.hadoop.util.ShutdownHookManager;
> import org.apache.kudu.client.*;
> import org.apache.spark.api.java.JavaRDD;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
> ullConstants;
>
> public class KuduProcess {
> private static Logger logger = LoggerFactory.getLogger(KuduPr
> ocess.class);
> private KuduTable table;
> private KuduSession session;
>
> public static void upsertKudu(JavaRDD> rdd, String
> host, String tableName) {
> rdd.foreachPartition(iterator -> {
> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
> tableName, host);
> int errorCount = errors.getRowErrors().length;
> if(errorCount > 0){
> throw new RuntimeException("Failed to write " + errorCount + "
> messages into Kudu");
> }
> });
> }
> private static RowErrorsAndOverflowStatus
> upsertOpIterator(Iterator> iter, String
> tableName, String host) {
> tr

Re: Spark Streaming + Kudu

2018-03-06 Thread Mike Percy
Thanks Ravi. Would you mind attaching the output of jstack on the process
during this hang? That would show what the Kudu client threads are doing,
as what we are seeing here is just the netty boss thread.

Mike

On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth  wrote:

>
> Yes, I have debugged to find the root cause. Every logger before "table =
> client.openTable(tableName);" is executing fine and exactly at the point
> of opening the table, it is throwing the below exception and nothing is
> being executed after that. Still the Spark batches are being processed and
> at opening the table is failing. I tried catching it with no luck. Please
> find below the exception.
>
> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream on
> [id: 0x6e13b01f]
> java.net.ConnectException: Connection refused: kudu102.dev.sac.int.
> threatmetrix.com/10.112.3.12:7050
> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
> at sun.nio.ch.SocketChannelImpl.finishConnect(
> SocketChannelImpl.java:717)
> at org.apache.kudu.client.shaded.org.jboss.netty.channel.
> socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
> at org.apache.kudu.client.shaded.org.jboss.netty.channel.
> socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
> at org.apache.kudu.client.shaded.org.jboss.netty.channel.
> socket.nio.NioClientBoss.process(NioClientBoss.java:79)
> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.
> AbstractNioSelector.run(AbstractNioSelector.java:337)
> at org.apache.kudu.client.shaded.org.jboss.netty.channel.
> socket.nio.NioClientBoss.run(NioClientBoss.java:42)
> at org.apache.kudu.client.shaded.org.jboss.netty.util.
> ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
> at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Thanks,
> Ravi
>
> On 5 March 2018 at 23:52, Mike Percy  wrote:
>
>> Have you considered checking your session error count or pending errors
>> in your while loop every so often? Can you identify where your code is
>> hanging when the connection is lost (what line)?
>>
>> Mike
>>
>> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth 
>> wrote:
>>
>>> In addition to my previous comment, I raised a support ticket for this
>>> issue with Cloudera and one of the support person mentioned below,
>>>
>>> *"Thank you for clarifying, The exceptions are logged but not re-thrown
>>> to an upper layer, so that explains why the Spark application is not aware
>>> of the underlying error."*
>>>
>>> On 5 March 2018 at 21:02, Ravi Kanth  wrote:
>>>
 Mike,

 Thanks for the information. But, once the connection to any of the Kudu
 servers is lost then there is no way I can have a control on the
 KuduSession object and so with getPendingErrors(). The KuduClient in this
 case is becoming a zombie and never returned back till the connection is
 properly established. I tried doing all that you have suggested with no
 luck. Attaching my KuduClient code.

 package org.dwh.streaming.kudu.sparkkudustreaming;

 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.kudu.client.*;
 import org.apache.spark.api.java.JavaRDD;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
 ullConstants;

 public class KuduProcess {
 private static Logger logger = LoggerFactory.getLogger(KuduPr
 ocess.class);
 private KuduTable table;
 private KuduSession session;

 public static void upsertKudu(JavaRDD> rdd, String
 host, String tableName) {
 rdd.foreachPartition(iterator -> {
 RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
 tableName, host);
 int errorCount = errors.getRowErrors().length;
 if(errorCount > 0){
 throw new RuntimeException("Failed to write " + errorCount + " messages
 into Kudu");
 }
 });
 }
 private static RowErrorsAndOverflowStatus 
 upsertOpIterator(Iterator>>> Object>> iter, String tableName, String host) {
 try {
 AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
 KuduClient client = asyncClient.syncClient();
 table = client.openTable(tableName);
 session = client.newSession();
 session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
 SH_BACKGROUND);
 while (iter.hasNext()) {
 upsertOp(iter.next());
 }
 } catch (KuduException e) {
 logger.error("Exception in upsertOpIterat

Re: Spark Streaming + Kudu

2018-03-06 Thread Ravi Kanth
Yes, I have debugged to find the root cause. Every logger before "table =
client.openTable(tableName);" is executing fine and exactly at the point of
opening the table, it is throwing the below exception and nothing is being
executed after that. Still the Spark batches are being processed and at
opening the table is failing. I tried catching it with no luck. Please find
below the exception.

8/02/23 00:16:30 ERROR client.TabletClient: [Peer
bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream on
[id: 0x6e13b01f]
java.net.ConnectException: Connection refused:
kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at
org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at
org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Thanks,
Ravi

On 5 March 2018 at 23:52, Mike Percy  wrote:

> Have you considered checking your session error count or pending errors in
> your while loop every so often? Can you identify where your code is hanging
> when the connection is lost (what line)?
>
> Mike
>
> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth 
> wrote:
>
>> In addition to my previous comment, I raised a support ticket for this
>> issue with Cloudera and one of the support person mentioned below,
>>
>> *"Thank you for clarifying, The exceptions are logged but not re-thrown
>> to an upper layer, so that explains why the Spark application is not aware
>> of the underlying error."*
>>
>> On 5 March 2018 at 21:02, Ravi Kanth  wrote:
>>
>>> Mike,
>>>
>>> Thanks for the information. But, once the connection to any of the Kudu
>>> servers is lost then there is no way I can have a control on the
>>> KuduSession object and so with getPendingErrors(). The KuduClient in this
>>> case is becoming a zombie and never returned back till the connection is
>>> properly established. I tried doing all that you have suggested with no
>>> luck. Attaching my KuduClient code.
>>>
>>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>>
>>> import java.util.HashMap;
>>> import java.util.Iterator;
>>> import java.util.Map;
>>> import org.apache.hadoop.util.ShutdownHookManager;
>>> import org.apache.kudu.client.*;
>>> import org.apache.spark.api.java.JavaRDD;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>>> ullConstants;
>>>
>>> public class KuduProcess {
>>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>>> ocess.class);
>>> private KuduTable table;
>>> private KuduSession session;
>>>
>>> public static void upsertKudu(JavaRDD> rdd, String
>>> host, String tableName) {
>>> rdd.foreachPartition(iterator -> {
>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
>>> tableName, host);
>>> int errorCount = errors.getRowErrors().length;
>>> if(errorCount > 0){
>>> throw new RuntimeException("Failed to write " + errorCount + " messages
>>> into Kudu");
>>> }
>>> });
>>> }
>>> private static RowErrorsAndOverflowStatus 
>>> upsertOpIterator(Iterator>> Object>> iter, String tableName, String host) {
>>> try {
>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
>>> KuduClient client = asyncClient.syncClient();
>>> table = client.openTable(tableName);
>>> session = client.newSession();
>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>>> SH_BACKGROUND);
>>> while (iter.hasNext()) {
>>> upsertOp(iter.next());
>>> }
>>> } catch (KuduException e) {
>>> logger.error("Exception in upsertOpIterator method", e);
>>> }
>>> finally{
>>> try {
>>> session.close();
>>> } catch (KuduException e) {
>>> logger.error("Exception in Connection close", e);
>>> }
>>> }
>>> return session.getPendingErrors();-> Once,
>>> the connection is lost, this part of the code never gets called and the
>>> Spark job will keep on running and processing the records while the
>>> KuduClient is trying to connect to Ku

Re: Spark Streaming + Kudu

2018-03-05 Thread Mike Percy
Have you considered checking your session error count or pending errors in
your while loop every so often? Can you identify where your code is hanging
when the connection is lost (what line)?

Mike

On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth  wrote:

> In addition to my previous comment, I raised a support ticket for this
> issue with Cloudera and one of the support person mentioned below,
>
> *"Thank you for clarifying, The exceptions are logged but not re-thrown to
> an upper layer, so that explains why the Spark application is not aware of
> the underlying error."*
>
> On 5 March 2018 at 21:02, Ravi Kanth  wrote:
>
>> Mike,
>>
>> Thanks for the information. But, once the connection to any of the Kudu
>> servers is lost then there is no way I can have a control on the
>> KuduSession object and so with getPendingErrors(). The KuduClient in this
>> case is becoming a zombie and never returned back till the connection is
>> properly established. I tried doing all that you have suggested with no
>> luck. Attaching my KuduClient code.
>>
>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>
>> import java.util.HashMap;
>> import java.util.Iterator;
>> import java.util.Map;
>> import org.apache.hadoop.util.ShutdownHookManager;
>> import org.apache.kudu.client.*;
>> import org.apache.spark.api.java.JavaRDD;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>> ullConstants;
>>
>> public class KuduProcess {
>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>> ocess.class);
>> private KuduTable table;
>> private KuduSession session;
>>
>> public static void upsertKudu(JavaRDD> rdd, String
>> host, String tableName) {
>> rdd.foreachPartition(iterator -> {
>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, tableName,
>> host);
>> int errorCount = errors.getRowErrors().length;
>> if(errorCount > 0){
>> throw new RuntimeException("Failed to write " + errorCount + " messages
>> into Kudu");
>> }
>> });
>> }
>> private static RowErrorsAndOverflowStatus 
>> upsertOpIterator(Iterator> Object>> iter, String tableName, String host) {
>> try {
>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
>> KuduClient client = asyncClient.syncClient();
>> table = client.openTable(tableName);
>> session = client.newSession();
>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>> SH_BACKGROUND);
>> while (iter.hasNext()) {
>> upsertOp(iter.next());
>> }
>> } catch (KuduException e) {
>> logger.error("Exception in upsertOpIterator method", e);
>> }
>> finally{
>> try {
>> session.close();
>> } catch (KuduException e) {
>> logger.error("Exception in Connection close", e);
>> }
>> }
>> return session.getPendingErrors();-> Once,
>> the connection is lost, this part of the code never gets called and the
>> Spark job will keep on running and processing the records while the
>> KuduClient is trying to connect to Kudu. Meanwhile, we are loosing all the
>> records.
>> }
>> public static void upsertOp(Map formattedMap) {
>> if (formattedMap.size() != 0) {
>> try {
>> Upsert upsert = table.newUpsert();
>> PartialRow row = upsert.getRow();
>> for (Map.Entry entry : formattedMap.entrySet()) {
>> if (entry.getValue().getClass().equals(String.class)) {
>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
>> row.setNull(entry.getKey());
>> else
>> row.addString(entry.getKey(), (String) entry.getValue());
>> } else if (entry.getValue().getClass().equals(Long.class)) {
>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
>> row.setNull(entry.getKey());
>> else
>> row.addLong(entry.getKey(), (Long) entry.getValue());
>> } else if (entry.getValue().getClass().equals(Integer.class)) {
>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
>> row.setNull(entry.getKey());
>> else
>> row.addInt(entry.getKey(), (Integer) entry.getValue());
>> }
>> }
>>
>> session.apply(upsert);
>> } catch (Exception e) {
>> logger.error("Exception during upsert:", e);
>> }
>> }
>> }
>> }
>> class KuduConnection {
>> private static Logger logger = LoggerFactory.getLogger(KuduCo
>> nnection.class);
>> private static Map asyncCache = new HashMap<>();
>> private static int ShutdownHookPriority = 100;
>>
>> static AsyncKuduClient getAsyncClient(String kuduMaster) {
>> if (!asyncCache.containsKey(kuduMaster)) {
>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien
>> tBuilder(kuduMaster).build();
>> ShutdownHookManager.get().addShutdownHook(new Runnable() {
>> @Override
>> public void run() {
>> try {
>> asyncClient.close();
>> } catch (Exception e) {
>> logger.error("Exception closing async client", e);
>> }
>> }
>> }, ShutdownHookPriority);
>> asyncCache.put(kuduMaster, asyncClient);
>> }
>> return asyncCache.get(kuduMaster);
>> }
>> }
>>
>>
>>
>> Thanks,
>> Ravi
>>
>> On 5 March 2018 at 16:20, Mike Percy  wrote:
>>
>>> Hi Ravi, it would be helpf

Re: Spark Streaming + Kudu

2018-03-05 Thread Ravi Kanth
In addition to my previous comment, I raised a support ticket for this
issue with Cloudera and one of the support person mentioned below,

*"Thank you for clarifying, The exceptions are logged but not re-thrown to
an upper layer, so that explains why the Spark application is not aware of
the underlying error."*

On 5 March 2018 at 21:02, Ravi Kanth  wrote:

> Mike,
>
> Thanks for the information. But, once the connection to any of the Kudu
> servers is lost then there is no way I can have a control on the
> KuduSession object and so with getPendingErrors(). The KuduClient in this
> case is becoming a zombie and never returned back till the connection is
> properly established. I tried doing all that you have suggested with no
> luck. Attaching my KuduClient code.
>
> package org.dwh.streaming.kudu.sparkkudustreaming;
>
> import java.util.HashMap;
> import java.util.Iterator;
> import java.util.Map;
> import org.apache.hadoop.util.ShutdownHookManager;
> import org.apache.kudu.client.*;
> import org.apache.spark.api.java.JavaRDD;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.dwh.streaming.kudu.sparkkudustreaming.constants.
> SpecialNullConstants;
>
> public class KuduProcess {
> private static Logger logger = LoggerFactory.getLogger(KuduProcess.class);
> private KuduTable table;
> private KuduSession session;
>
> public static void upsertKudu(JavaRDD> rdd, String
> host, String tableName) {
> rdd.foreachPartition(iterator -> {
> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, tableName,
> host);
> int errorCount = errors.getRowErrors().length;
> if(errorCount > 0){
> throw new RuntimeException("Failed to write " + errorCount + " messages
> into Kudu");
> }
> });
> }
> private static RowErrorsAndOverflowStatus 
> upsertOpIterator(Iterator Object>> iter, String tableName, String host) {
> try {
> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
> KuduClient client = asyncClient.syncClient();
> table = client.openTable(tableName);
> session = client.newSession();
> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_
> BACKGROUND);
> while (iter.hasNext()) {
> upsertOp(iter.next());
> }
> } catch (KuduException e) {
> logger.error("Exception in upsertOpIterator method", e);
> }
> finally{
> try {
> session.close();
> } catch (KuduException e) {
> logger.error("Exception in Connection close", e);
> }
> }
> return session.getPendingErrors();-> Once,
> the connection is lost, this part of the code never gets called and the
> Spark job will keep on running and processing the records while the
> KuduClient is trying to connect to Kudu. Meanwhile, we are loosing all the
> records.
> }
> public static void upsertOp(Map formattedMap) {
> if (formattedMap.size() != 0) {
> try {
> Upsert upsert = table.newUpsert();
> PartialRow row = upsert.getRow();
> for (Map.Entry entry : formattedMap.entrySet()) {
> if (entry.getValue().getClass().equals(String.class)) {
> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
> row.setNull(entry.getKey());
> else
> row.addString(entry.getKey(), (String) entry.getValue());
> } else if (entry.getValue().getClass().equals(Long.class)) {
> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
> row.setNull(entry.getKey());
> else
> row.addLong(entry.getKey(), (Long) entry.getValue());
> } else if (entry.getValue().getClass().equals(Integer.class)) {
> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
> row.setNull(entry.getKey());
> else
> row.addInt(entry.getKey(), (Integer) entry.getValue());
> }
> }
>
> session.apply(upsert);
> } catch (Exception e) {
> logger.error("Exception during upsert:", e);
> }
> }
> }
> }
> class KuduConnection {
> private static Logger logger = LoggerFactory.getLogger(
> KuduConnection.class);
> private static Map asyncCache = new HashMap<>();
> private static int ShutdownHookPriority = 100;
>
> static AsyncKuduClient getAsyncClient(String kuduMaster) {
> if (!asyncCache.containsKey(kuduMaster)) {
> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(
> kuduMaster).build();
> ShutdownHookManager.get().addShutdownHook(new Runnable() {
> @Override
> public void run() {
> try {
> asyncClient.close();
> } catch (Exception e) {
> logger.error("Exception closing async client", e);
> }
> }
> }, ShutdownHookPriority);
> asyncCache.put(kuduMaster, asyncClient);
> }
> return asyncCache.get(kuduMaster);
> }
> }
>
>
>
> Thanks,
> Ravi
>
> On 5 March 2018 at 16:20, Mike Percy  wrote:
>
>> Hi Ravi, it would be helpful if you could attach what you are getting
>> back from getPendingErrors() -- perhaps from dumping RowError.toString()
>> from items in the returned array -- and indicate what you were hoping to
>> get back. Note that a RowError can also return to you the Operation
>> 
>> that you used to generate the

Re: Spark Streaming + Kudu

2018-03-05 Thread Ravi Kanth
Mike,

Thanks for the information. But, once the connection to any of the Kudu
servers is lost then there is no way I can have a control on the
KuduSession object and so with getPendingErrors(). The KuduClient in this
case is becoming a zombie and never returned back till the connection is
properly established. I tried doing all that you have suggested with no
luck. Attaching my KuduClient code.

package org.dwh.streaming.kudu.sparkkudustreaming;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.kudu.client.*;
import org.apache.spark.api.java.JavaRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import
org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialNullConstants;

public class KuduProcess {
private static Logger logger = LoggerFactory.getLogger(KuduProcess.class);
private KuduTable table;
private KuduSession session;

public static void upsertKudu(JavaRDD> rdd, String
host, String tableName) {
rdd.foreachPartition(iterator -> {
RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, tableName,
host);
int errorCount = errors.getRowErrors().length;
if(errorCount > 0){
throw new RuntimeException("Failed to write " + errorCount + " messages
into Kudu");
}
});
}
private static RowErrorsAndOverflowStatus
upsertOpIterator(Iterator> iter, String tableName,
String host) {
try {
AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
KuduClient client = asyncClient.syncClient();
table = client.openTable(tableName);
session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
while (iter.hasNext()) {
upsertOp(iter.next());
}
} catch (KuduException e) {
logger.error("Exception in upsertOpIterator method", e);
}
finally{
try {
session.close();
} catch (KuduException e) {
logger.error("Exception in Connection close", e);
}
}
return session.getPendingErrors();-> Once, the
connection is lost, this part of the code never gets called and the Spark
job will keep on running and processing the records while the KuduClient is
trying to connect to Kudu. Meanwhile, we are loosing all the records.
}
public static void upsertOp(Map formattedMap) {
if (formattedMap.size() != 0) {
try {
Upsert upsert = table.newUpsert();
PartialRow row = upsert.getRow();
for (Map.Entry entry : formattedMap.entrySet()) {
if (entry.getValue().getClass().equals(String.class)) {
if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
row.setNull(entry.getKey());
else
row.addString(entry.getKey(), (String) entry.getValue());
} else if (entry.getValue().getClass().equals(Long.class)) {
if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
row.setNull(entry.getKey());
else
row.addLong(entry.getKey(), (Long) entry.getValue());
} else if (entry.getValue().getClass().equals(Integer.class)) {
if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
row.setNull(entry.getKey());
else
row.addInt(entry.getKey(), (Integer) entry.getValue());
}
}

session.apply(upsert);
} catch (Exception e) {
logger.error("Exception during upsert:", e);
}
}
}
}
class KuduConnection {
private static Logger logger =
LoggerFactory.getLogger(KuduConnection.class);
private static Map asyncCache = new HashMap<>();
private static int ShutdownHookPriority = 100;

static AsyncKuduClient getAsyncClient(String kuduMaster) {
if (!asyncCache.containsKey(kuduMaster)) {
AsyncKuduClient asyncClient = new
AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build();
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
try {
asyncClient.close();
} catch (Exception e) {
logger.error("Exception closing async client", e);
}
}
}, ShutdownHookPriority);
asyncCache.put(kuduMaster, asyncClient);
}
return asyncCache.get(kuduMaster);
}
}



Thanks,
Ravi

On 5 March 2018 at 16:20, Mike Percy  wrote:

> Hi Ravi, it would be helpful if you could attach what you are getting back
> from getPendingErrors() -- perhaps from dumping RowError.toString() from
> items in the returned array -- and indicate what you were hoping to get
> back. Note that a RowError can also return to you the Operation
> 
> that you used to generate the write. From the Operation, you can get the
> original PartialRow
> 
> object, which should be able to identify the affected row that the write
> failed for. Does that help?
>
> Since you are using the Kudu client directly, Spark is not involved from
> the Kudu perspective, so you will need to deal with Spark on your own in
> that case.
>
> Mike
>
> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth 
> wrote:
>
>> Hi Mike,
>>
>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>>
>> So, I am trying to use Kudu Client API to perform 

Re: Spark Streaming + Kudu

2018-03-05 Thread Mike Percy
Hi Ravi, it would be helpful if you could attach what you are getting back
from getPendingErrors() -- perhaps from dumping RowError.toString() from
items in the returned array -- and indicate what you were hoping to get
back. Note that a RowError can also return to you the Operation

that you used to generate the write. From the Operation, you can get the
original PartialRow

object, which should be able to identify the affected row that the write
failed for. Does that help?

Since you are using the Kudu client directly, Spark is not involved from
the Kudu perspective, so you will need to deal with Spark on your own in
that case.

Mike

On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth  wrote:

> Hi Mike,
>
> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>
> So, I am trying to use Kudu Client API to perform UPSERT into Kudu and I
> integrated this with Spark. I am trying to test a case where in if any of
> Kudu server fails. So, in this case, if there is any problem in writing,
> getPendingErrors() should give me a way to handle these errors so that I
> can successfully terminate my Spark Job. This is what I am trying to do.
>
> But, I am not able to get a hold of the exceptions being thrown from with
> in the KuduClient when retrying to connect to Tablet Server. My
> getPendingErrors is not getting ahold of these exceptions.
>
> Let me know if you need more clarification. I can post some Snippets.
>
> Thanks,
> Ravi
>
> On 5 March 2018 at 13:18, Mike Percy  wrote:
>
>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
>> ?
>> You mention that you are trying to use getPendingErrors()
>> 
>>  but
>> it sounds like it's not working for you -- can you be more specific about
>> what you expect and what you are observing?
>>
>> Thanks,
>> Mike
>>
>>
>>
>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth 
>> wrote:
>>
>>> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
>>> any issues in production and we are not losing tablet servers. But, as part
>>> of testing I have to generate few unforeseen cases to analyse the
>>> application performance. One among that is bringing down the tablet server
>>> or master server intentionally during which I observed the loss of records.
>>> Just wanted to test cases out of the happy path here. Once again thanks for
>>> taking time to respond to me.
>>>
>>> - Ravi
>>>
>>> On 26 February 2018 at 19:58, Clifford Resnick 
>>> wrote:
>>>
 I'll have to get back to you on the code bits, but I'm pretty sure
 we're doing simple sync batching. We're not in production yet, but after
 some months of development I haven't seen any failures, even when pushing
 load doing multiple years' backfill. I think the real question is why are
 you losing tablet servers? The only instability we ever had with Kudu was
 when it had that weird ntp sync issue that was fixed I think for 1.6. What
 version are you running?

 Anyway I would think that infinite loop should be catchable somewhere.
 Our pipeline is set to fail/retry with Flink snapshots. I imagine there is
 similar with Spark. Sorry I cant be of more help!



 On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:

 Cliff,

 Thanks for the response. Well, I do agree that its simple and seamless.
 In my case, I am able to upsert ~25000 events/sec into Kudu. But, I am
 facing the problem when any of the Kudu Tablet or master server is down. I
 am not able to get a hold of the exception from client. The client is going
 into an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
 records. I tried handling the errors through getPendingErrors() but still
 it is helpless. I am using AsyncKuduClient to establish the connection and
 retrieving the syncClient from the Async to open the session and table. Any
 help?

 Thanks,
 Ravi

 On 26 February 2018 at 18:00, Cliff Resnick  wrote:

 While I can't speak for Spark, we do use the client API from Flink
 streaming and it's simple and seamless. It's especially nice if you require
 an Upsert semantic.

 On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:

 Hi,

 Anyone using Spark Streaming to ingest data into Kudu and using Kudu
 Client API to do so rather than the traditional KuduContext API? I am stuck
 at a point and couldn't find a solution.

 Thanks,
 Ravi




>>>
>>
>


Re: Spark Streaming + Kudu

2018-03-05 Thread Ravi Kanth
Hi Mike,

Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.

So, I am trying to use Kudu Client API to perform UPSERT into Kudu and I
integrated this with Spark. I am trying to test a case where in if any of
Kudu server fails. So, in this case, if there is any problem in writing,
getPendingErrors() should give me a way to handle these errors so that I
can successfully terminate my Spark Job. This is what I am trying to do.

But, I am not able to get a hold of the exceptions being thrown from with
in the KuduClient when retrying to connect to Tablet Server. My
getPendingErrors is not getting ahold of these exceptions.

Let me know if you need more clarification. I can post some Snippets.

Thanks,
Ravi

On 5 March 2018 at 13:18, Mike Percy  wrote:

> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
> ?
> You mention that you are trying to use getPendingErrors()
> 
>  but
> it sounds like it's not working for you -- can you be more specific about
> what you expect and what you are observing?
>
> Thanks,
> Mike
>
>
>
> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth 
> wrote:
>
>> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
>> any issues in production and we are not losing tablet servers. But, as part
>> of testing I have to generate few unforeseen cases to analyse the
>> application performance. One among that is bringing down the tablet server
>> or master server intentionally during which I observed the loss of records.
>> Just wanted to test cases out of the happy path here. Once again thanks for
>> taking time to respond to me.
>>
>> - Ravi
>>
>> On 26 February 2018 at 19:58, Clifford Resnick 
>> wrote:
>>
>>> I'll have to get back to you on the code bits, but I'm pretty sure we're
>>> doing simple sync batching. We're not in production yet, but after some
>>> months of development I haven't seen any failures, even when pushing load
>>> doing multiple years' backfill. I think the real question is why are you
>>> losing tablet servers? The only instability we ever had with Kudu was when
>>> it had that weird ntp sync issue that was fixed I think for 1.6. What
>>> version are you running?
>>>
>>> Anyway I would think that infinite loop should be catchable somewhere.
>>> Our pipeline is set to fail/retry with Flink snapshots. I imagine there is
>>> similar with Spark. Sorry I cant be of more help!
>>>
>>>
>>>
>>> On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:
>>>
>>> Cliff,
>>>
>>> Thanks for the response. Well, I do agree that its simple and seamless.
>>> In my case, I am able to upsert ~25000 events/sec into Kudu. But, I am
>>> facing the problem when any of the Kudu Tablet or master server is down. I
>>> am not able to get a hold of the exception from client. The client is going
>>> into an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
>>> records. I tried handling the errors through getPendingErrors() but still
>>> it is helpless. I am using AsyncKuduClient to establish the connection and
>>> retrieving the syncClient from the Async to open the session and table. Any
>>> help?
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 26 February 2018 at 18:00, Cliff Resnick  wrote:
>>>
>>> While I can't speak for Spark, we do use the client API from Flink
>>> streaming and it's simple and seamless. It's especially nice if you require
>>> an Upsert semantic.
>>>
>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:
>>>
>>> Hi,
>>>
>>> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
>>> Client API to do so rather than the traditional KuduContext API? I am stuck
>>> at a point and couldn't find a solution.
>>>
>>> Thanks,
>>> Ravi
>>>
>>>
>>>
>>>
>>
>


Re: Spark Streaming + Kudu

2018-03-05 Thread Mike Percy
Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
?
You mention that you are trying to use getPendingErrors()

but
it sounds like it's not working for you -- can you be more specific about
what you expect and what you are observing?

Thanks,
Mike



On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth  wrote:

> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
> any issues in production and we are not losing tablet servers. But, as part
> of testing I have to generate few unforeseen cases to analyse the
> application performance. One among that is bringing down the tablet server
> or master server intentionally during which I observed the loss of records.
> Just wanted to test cases out of the happy path here. Once again thanks for
> taking time to respond to me.
>
> - Ravi
>
> On 26 February 2018 at 19:58, Clifford Resnick 
> wrote:
>
>> I'll have to get back to you on the code bits, but I'm pretty sure we're
>> doing simple sync batching. We're not in production yet, but after some
>> months of development I haven't seen any failures, even when pushing load
>> doing multiple years' backfill. I think the real question is why are you
>> losing tablet servers? The only instability we ever had with Kudu was when
>> it had that weird ntp sync issue that was fixed I think for 1.6. What
>> version are you running?
>>
>> Anyway I would think that infinite loop should be catchable somewhere.
>> Our pipeline is set to fail/retry with Flink snapshots. I imagine there is
>> similar with Spark. Sorry I cant be of more help!
>>
>>
>>
>> On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:
>>
>> Cliff,
>>
>> Thanks for the response. Well, I do agree that its simple and seamless.
>> In my case, I am able to upsert ~25000 events/sec into Kudu. But, I am
>> facing the problem when any of the Kudu Tablet or master server is down. I
>> am not able to get a hold of the exception from client. The client is going
>> into an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
>> records. I tried handling the errors through getPendingErrors() but still
>> it is helpless. I am using AsyncKuduClient to establish the connection and
>> retrieving the syncClient from the Async to open the session and table. Any
>> help?
>>
>> Thanks,
>> Ravi
>>
>> On 26 February 2018 at 18:00, Cliff Resnick  wrote:
>>
>> While I can't speak for Spark, we do use the client API from Flink
>> streaming and it's simple and seamless. It's especially nice if you require
>> an Upsert semantic.
>>
>> On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:
>>
>> Hi,
>>
>> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
>> Client API to do so rather than the traditional KuduContext API? I am stuck
>> at a point and couldn't find a solution.
>>
>> Thanks,
>> Ravi
>>
>>
>>
>>
>


Re: Spark Streaming + Kudu

2018-02-26 Thread Ravi Kanth
Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
any issues in production and we are not losing tablet servers. But, as part
of testing I have to generate few unforeseen cases to analyse the
application performance. One among that is bringing down the tablet server
or master server intentionally during which I observed the loss of records.
Just wanted to test cases out of the happy path here. Once again thanks for
taking time to respond to me.

- Ravi

On 26 February 2018 at 19:58, Clifford Resnick 
wrote:

> I'll have to get back to you on the code bits, but I'm pretty sure we're
> doing simple sync batching. We're not in production yet, but after some
> months of development I haven't seen any failures, even when pushing load
> doing multiple years' backfill. I think the real question is why are you
> losing tablet servers? The only instability we ever had with Kudu was when
> it had that weird ntp sync issue that was fixed I think for 1.6. What
> version are you running?
>
> Anyway I would think that infinite loop should be catchable somewhere. Our
> pipeline is set to fail/retry with Flink snapshots. I imagine there is
> similar with Spark. Sorry I cant be of more help!
>
>
>
> On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:
>
> Cliff,
>
> Thanks for the response. Well, I do agree that its simple and seamless. In
> my case, I am able to upsert ~25000 events/sec into Kudu. But, I am facing
> the problem when any of the Kudu Tablet or master server is down. I am not
> able to get a hold of the exception from client. The client is going into
> an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
> records. I tried handling the errors through getPendingErrors() but still
> it is helpless. I am using AsyncKuduClient to establish the connection and
> retrieving the syncClient from the Async to open the session and table. Any
> help?
>
> Thanks,
> Ravi
>
> On 26 February 2018 at 18:00, Cliff Resnick  wrote:
>
> While I can't speak for Spark, we do use the client API from Flink
> streaming and it's simple and seamless. It's especially nice if you require
> an Upsert semantic.
>
> On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:
>
> Hi,
>
> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
> Client API to do so rather than the traditional KuduContext API? I am stuck
> at a point and couldn't find a solution.
>
> Thanks,
> Ravi
>
>
>
>


Re: Spark Streaming + Kudu

2018-02-26 Thread Clifford Resnick
I'll have to get back to you on the code bits, but I'm pretty sure we're doing 
simple sync batching. We're not in production yet, but after some months of 
development I haven't seen any failures, even when pushing load doing multiple 
years' backfill. I think the real question is why are you losing tablet 
servers? The only instability we ever had with Kudu was when it had that weird 
ntp sync issue that was fixed I think for 1.6. What version are you running?

Anyway I would think that infinite loop should be catchable somewhere. Our 
pipeline is set to fail/retry with Flink snapshots. I imagine there is similar 
with Spark. Sorry I cant be of more help!



On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:
Cliff,

Thanks for the response. Well, I do agree that its simple and seamless. In my 
case, I am able to upsert ~25000 events/sec into Kudu. But, I am facing the 
problem when any of the Kudu Tablet or master server is down. I am not able to 
get a hold of the exception from client. The client is going into an infinite 
loop trying to connect to Kudu. Meanwhile, I am loosing my records. I tried 
handling the errors through getPendingErrors() but still it is helpless. I am 
using AsyncKuduClient to establish the connection and retrieving the syncClient 
from the Async to open the session and table. Any help?

Thanks,
Ravi

On 26 February 2018 at 18:00, Cliff Resnick 
mailto:cre...@gmail.com>> wrote:
While I can't speak for Spark, we do use the client API from Flink streaming 
and it's simple and seamless. It's especially nice if you require an Upsert 
semantic.

On Feb 26, 2018 7:51 PM, "Ravi Kanth" 
mailto:ravikanth@gmail.com>> wrote:
Hi,

Anyone using Spark Streaming to ingest data into Kudu and using Kudu Client API 
to do so rather than the traditional KuduContext API? I am stuck at a point and 
couldn't find a solution.

Thanks,
Ravi




Re: Spark Streaming + Kudu

2018-02-26 Thread Ravi Kanth
Cliff,

Thanks for the response. Well, I do agree that its simple and seamless. In
my case, I am able to upsert ~25000 events/sec into Kudu. But, I am facing
the problem when any of the Kudu Tablet or master server is down. I am not
able to get a hold of the exception from client. The client is going into
an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
records. I tried handling the errors through getPendingErrors() but still
it is helpless. I am using AsyncKuduClient to establish the connection and
retrieving the syncClient from the Async to open the session and table. Any
help?

Thanks,
Ravi

On 26 February 2018 at 18:00, Cliff Resnick  wrote:

> While I can't speak for Spark, we do use the client API from Flink
> streaming and it's simple and seamless. It's especially nice if you require
> an Upsert semantic.
>
> On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:
>
>> Hi,
>>
>> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
>> Client API to do so rather than the traditional KuduContext API? I am stuck
>> at a point and couldn't find a solution.
>>
>> Thanks,
>> Ravi
>>
>


Re: Spark Streaming + Kudu

2018-02-26 Thread Cliff Resnick
While I can't speak for Spark, we do use the client API from Flink
streaming and it's simple and seamless. It's especially nice if you require
an Upsert semantic.

On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:

> Hi,
>
> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
> Client API to do so rather than the traditional KuduContext API? I am stuck
> at a point and couldn't find a solution.
>
> Thanks,
> Ravi
>


Spark Streaming + Kudu

2018-02-26 Thread Ravi Kanth
Hi,

Anyone using Spark Streaming to ingest data into Kudu and using Kudu Client
API to do so rather than the traditional KuduContext API? I am stuck at a
point and couldn't find a solution.

Thanks,
Ravi