Re: spark on kudu performance!

2018-07-05 Thread Todd Lipcon
On Mon, Jun 11, 2018 at 5:52 AM, fengba...@uce.cn  wrote:

> Hi:
>
>  I use kudu official website development documents, use
> spark analysis kudu data(kudu's version is 1.6.0):
>
> the official  code is :
> *val df = sqlContext.read.options(Map("kudu.master" ->
> "kudu.master:7051","kudu.table" -> "kudu_table")).kudu // Query using the
> Spark API... df.select("id").filter("id" >= 5).show()*
>
>
> My question  is :
> (1)If I use the official website code, when creating
> data collection of df, the data of my table is about 1.8
> billion, and then the filter of df is performed. This is
> equivalent to loading 1.8 billion data into memory each
> time, and the performance is very poor.
>

That's not correct. Data frames are lazy-evaluated, so when you use a
filter like the above, it does not fully materialize the whole data frame
into memory before it begins to filter.

You can also use ".explain()" to see whether the filter you are specifying
is getting pushed down properly to Kudu.


>
> (2)Create a time-based range partition on the 1.8 billion
> table, and then directly use the underlying java api,scan
> partition to analyze, this is not the amount of data each
> time loading is the specified number of partitions instead
> of 1.8 billion data?
>
> Please give me some suggestions, thanks!
>
>
The above should happen automatically so long as the filter predicate has
been pushed down. Using 'explain()' and showing us the results, along with
the code you used to create your table, will help understand what might be
the problem with performance.

-Todd
--
Todd Lipcon
Software Engineer, Cloudera


Re: Spark Streaming + Kudu

2018-03-06 Thread Ravi Kanth
Mike- I actually got a hold of the pid's for the spark executors but facing
issues to run the jstack. There are some VM exceptions. I will figure it
out and will attach the jstack. Thanks for your patience.

On 6 March 2018 at 20:42, Mike Percy  wrote:

> Hmm, could you try in spark local mode? i.e. https://jaceklaskowski.gi
> tbooks.io/mastering-apache-spark/content/spark-local.html
>
> Mike
>
> On Tue, Mar 6, 2018 at 7:14 PM, Ravi Kanth 
> wrote:
>
>> Mike,
>>
>> Can you clarify a bit on grabbing the jstack for the process? I launched
>> my Spark application and tried to get the pid using which I thought I can
>> grab jstack trace during hang. Unfortunately, I am not able to figure out
>> grabbing pid for Spark application.
>>
>> Thanks,
>> Ravi
>>
>> On 6 March 2018 at 18:36, Mike Percy  wrote:
>>
>>> Thanks Ravi. Would you mind attaching the output of jstack on the
>>> process during this hang? That would show what the Kudu client threads are
>>> doing, as what we are seeing here is just the netty boss thread.
>>>
>>> Mike
>>>
>>> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth 
>>> wrote:
>>>

 Yes, I have debugged to find the root cause. Every logger before "table
 = client.openTable(tableName);" is executing fine and exactly at the
 point of opening the table, it is throwing the below exception and nothing
 is being executed after that. Still the Spark batches are being processed
 and at opening the table is failing. I tried catching it with no luck.
 Please find below the exception.

 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
 bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream
 on [id: 0x6e13b01f]
 java.net.ConnectException: Connection refused:
 kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
 at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
 at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
 .java:717)
 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
 .nio.NioClientBoss.connect(NioClientBoss.java:152)
 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
 .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
 .nio.NioClientBoss.process(NioClientBoss.java:79)
 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
 .nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
 at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
 .nio.NioClientBoss.run(NioClientBoss.java:42)
 at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen
 amingRunnable.run(ThreadRenamingRunnable.java:108)
 at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
 DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
 Executor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
 lExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)


 Thanks,
 Ravi

 On 5 March 2018 at 23:52, Mike Percy  wrote:

> Have you considered checking your session error count or pending
> errors in your while loop every so often? Can you identify where your code
> is hanging when the connection is lost (what line)?
>
> Mike
>
> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth 
> wrote:
>
>> In addition to my previous comment, I raised a support ticket for
>> this issue with Cloudera and one of the support person mentioned below,
>>
>> *"Thank you for clarifying, The exceptions are logged but not
>> re-thrown to an upper layer, so that explains why the Spark application 
>> is
>> not aware of the underlying error."*
>>
>> On 5 March 2018 at 21:02, Ravi Kanth  wrote:
>>
>>> Mike,
>>>
>>> Thanks for the information. But, once the connection to any of the
>>> Kudu servers is lost then there is no way I can have a control on the
>>> KuduSession object and so with getPendingErrors(). The KuduClient in 
>>> this
>>> case is becoming a zombie and never returned back till the connection is
>>> properly established. I tried doing all that you have suggested with no
>>> luck. Attaching my KuduClient code.
>>>
>>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>>
>>> import java.util.HashMap;
>>> import java.util.Iterator;
>>> import java.util.Map;
>>> import org.apache.hadoop.util.ShutdownHookManager;
>>> import org.apache.kudu.client.*;
>>> import org.apache.spark.api.java.JavaRDD;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> import 

Re: Spark Streaming + Kudu

2018-03-06 Thread Mike Percy
Hmm, could you try in spark local mode? i.e. https://jaceklaskowski.
gitbooks.io/mastering-apache-spark/content/spark-local.html

Mike

On Tue, Mar 6, 2018 at 7:14 PM, Ravi Kanth  wrote:

> Mike,
>
> Can you clarify a bit on grabbing the jstack for the process? I launched
> my Spark application and tried to get the pid using which I thought I can
> grab jstack trace during hang. Unfortunately, I am not able to figure out
> grabbing pid for Spark application.
>
> Thanks,
> Ravi
>
> On 6 March 2018 at 18:36, Mike Percy  wrote:
>
>> Thanks Ravi. Would you mind attaching the output of jstack on the process
>> during this hang? That would show what the Kudu client threads are doing,
>> as what we are seeing here is just the netty boss thread.
>>
>> Mike
>>
>> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth 
>> wrote:
>>
>>>
>>> Yes, I have debugged to find the root cause. Every logger before "table
>>> = client.openTable(tableName);" is executing fine and exactly at the
>>> point of opening the table, it is throwing the below exception and nothing
>>> is being executed after that. Still the Spark batches are being processed
>>> and at opening the table is failing. I tried catching it with no luck.
>>> Please find below the exception.
>>>
>>> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
>>> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream
>>> on [id: 0x6e13b01f]
>>> java.net.ConnectException: Connection refused:
>>> kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
>>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>>> .java:717)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.connect(NioClientBoss.java:152)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.process(NioClientBoss.java:79)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>> .nio.NioClientBoss.run(NioClientBoss.java:42)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen
>>> amingRunnable.run(ThreadRenamingRunnable.java:108)
>>> at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
>>> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>> Executor.java:1142)
>>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>> lExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 5 March 2018 at 23:52, Mike Percy  wrote:
>>>
 Have you considered checking your session error count or pending errors
 in your while loop every so often? Can you identify where your code is
 hanging when the connection is lost (what line)?

 Mike

 On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth 
 wrote:

> In addition to my previous comment, I raised a support ticket for this
> issue with Cloudera and one of the support person mentioned below,
>
> *"Thank you for clarifying, The exceptions are logged but not
> re-thrown to an upper layer, so that explains why the Spark application is
> not aware of the underlying error."*
>
> On 5 March 2018 at 21:02, Ravi Kanth  wrote:
>
>> Mike,
>>
>> Thanks for the information. But, once the connection to any of the
>> Kudu servers is lost then there is no way I can have a control on the
>> KuduSession object and so with getPendingErrors(). The KuduClient in this
>> case is becoming a zombie and never returned back till the connection is
>> properly established. I tried doing all that you have suggested with no
>> luck. Attaching my KuduClient code.
>>
>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>
>> import java.util.HashMap;
>> import java.util.Iterator;
>> import java.util.Map;
>> import org.apache.hadoop.util.ShutdownHookManager;
>> import org.apache.kudu.client.*;
>> import org.apache.spark.api.java.JavaRDD;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>> ullConstants;
>>
>> public class KuduProcess {
>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>> ocess.class);
>> private KuduTable table;
>> private KuduSession session;
>>
>> public static void upsertKudu(JavaRDD> rdd,
>> String host, String tableName) {

Re: Spark Streaming + Kudu

2018-03-06 Thread Ravi Kanth
Mike,

Can you clarify a bit on grabbing the jstack for the process? I launched my
Spark application and tried to get the pid using which I thought I can grab
jstack trace during hang. Unfortunately, I am not able to figure out
grabbing pid for Spark application.

Thanks,
Ravi

On 6 March 2018 at 18:36, Mike Percy  wrote:

> Thanks Ravi. Would you mind attaching the output of jstack on the process
> during this hang? That would show what the Kudu client threads are doing,
> as what we are seeing here is just the netty boss thread.
>
> Mike
>
> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth 
> wrote:
>
>>
>> Yes, I have debugged to find the root cause. Every logger before "table
>> = client.openTable(tableName);" is executing fine and exactly at the
>> point of opening the table, it is throwing the below exception and nothing
>> is being executed after that. Still the Spark batches are being processed
>> and at opening the table is failing. I tried catching it with no luck.
>> Please find below the exception.
>>
>> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
>> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream
>> on [id: 0x6e13b01f]
>> java.net.ConnectException: Connection refused:
>> kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
>> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>> at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>> .java:717)
>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.connect(NioClientBoss.java:152)
>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.process(NioClientBoss.java:79)
>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>> at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>> .nio.NioClientBoss.run(NioClientBoss.java:42)
>> at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen
>> amingRunnable.run(ThreadRenamingRunnable.java:108)
>> at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
>> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> Thanks,
>> Ravi
>>
>> On 5 March 2018 at 23:52, Mike Percy  wrote:
>>
>>> Have you considered checking your session error count or pending errors
>>> in your while loop every so often? Can you identify where your code is
>>> hanging when the connection is lost (what line)?
>>>
>>> Mike
>>>
>>> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth 
>>> wrote:
>>>
 In addition to my previous comment, I raised a support ticket for this
 issue with Cloudera and one of the support person mentioned below,

 *"Thank you for clarifying, The exceptions are logged but not re-thrown
 to an upper layer, so that explains why the Spark application is not aware
 of the underlying error."*

 On 5 March 2018 at 21:02, Ravi Kanth  wrote:

> Mike,
>
> Thanks for the information. But, once the connection to any of the
> Kudu servers is lost then there is no way I can have a control on the
> KuduSession object and so with getPendingErrors(). The KuduClient in this
> case is becoming a zombie and never returned back till the connection is
> properly established. I tried doing all that you have suggested with no
> luck. Attaching my KuduClient code.
>
> package org.dwh.streaming.kudu.sparkkudustreaming;
>
> import java.util.HashMap;
> import java.util.Iterator;
> import java.util.Map;
> import org.apache.hadoop.util.ShutdownHookManager;
> import org.apache.kudu.client.*;
> import org.apache.spark.api.java.JavaRDD;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
> ullConstants;
>
> public class KuduProcess {
> private static Logger logger = LoggerFactory.getLogger(KuduPr
> ocess.class);
> private KuduTable table;
> private KuduSession session;
>
> public static void upsertKudu(JavaRDD> rdd, String
> host, String tableName) {
> rdd.foreachPartition(iterator -> {
> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
> tableName, host);
> int errorCount = errors.getRowErrors().length;
> if(errorCount > 0){
> throw new RuntimeException("Failed to write " + errorCount + "
> messages into Kudu");
> }
> });
> }

Re: Spark Streaming + Kudu

2018-03-06 Thread Ravi Kanth
Yes, I have debugged to find the root cause. Every logger before "table =
client.openTable(tableName);" is executing fine and exactly at the point of
opening the table, it is throwing the below exception and nothing is being
executed after that. Still the Spark batches are being processed and at
opening the table is failing. I tried catching it with no luck. Please find
below the exception.

8/02/23 00:16:30 ERROR client.TabletClient: [Peer
bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream on
[id: 0x6e13b01f]
java.net.ConnectException: Connection refused:
kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at
org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at
org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Thanks,
Ravi

On 5 March 2018 at 23:52, Mike Percy  wrote:

> Have you considered checking your session error count or pending errors in
> your while loop every so often? Can you identify where your code is hanging
> when the connection is lost (what line)?
>
> Mike
>
> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth 
> wrote:
>
>> In addition to my previous comment, I raised a support ticket for this
>> issue with Cloudera and one of the support person mentioned below,
>>
>> *"Thank you for clarifying, The exceptions are logged but not re-thrown
>> to an upper layer, so that explains why the Spark application is not aware
>> of the underlying error."*
>>
>> On 5 March 2018 at 21:02, Ravi Kanth  wrote:
>>
>>> Mike,
>>>
>>> Thanks for the information. But, once the connection to any of the Kudu
>>> servers is lost then there is no way I can have a control on the
>>> KuduSession object and so with getPendingErrors(). The KuduClient in this
>>> case is becoming a zombie and never returned back till the connection is
>>> properly established. I tried doing all that you have suggested with no
>>> luck. Attaching my KuduClient code.
>>>
>>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>>
>>> import java.util.HashMap;
>>> import java.util.Iterator;
>>> import java.util.Map;
>>> import org.apache.hadoop.util.ShutdownHookManager;
>>> import org.apache.kudu.client.*;
>>> import org.apache.spark.api.java.JavaRDD;
>>> import org.slf4j.Logger;
>>> import org.slf4j.LoggerFactory;
>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>>> ullConstants;
>>>
>>> public class KuduProcess {
>>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>>> ocess.class);
>>> private KuduTable table;
>>> private KuduSession session;
>>>
>>> public static void upsertKudu(JavaRDD> rdd, String
>>> host, String tableName) {
>>> rdd.foreachPartition(iterator -> {
>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
>>> tableName, host);
>>> int errorCount = errors.getRowErrors().length;
>>> if(errorCount > 0){
>>> throw new RuntimeException("Failed to write " + errorCount + " messages
>>> into Kudu");
>>> }
>>> });
>>> }
>>> private static RowErrorsAndOverflowStatus 
>>> upsertOpIterator(Iterator>> Object>> iter, String tableName, String host) {
>>> try {
>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
>>> KuduClient client = asyncClient.syncClient();
>>> table = client.openTable(tableName);
>>> session = client.newSession();
>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>>> SH_BACKGROUND);
>>> while (iter.hasNext()) {
>>> upsertOp(iter.next());
>>> }
>>> } catch (KuduException e) {
>>> logger.error("Exception in upsertOpIterator method", e);
>>> }
>>> finally{
>>> try {
>>> session.close();
>>> } catch (KuduException e) {
>>> logger.error("Exception in Connection close", e);
>>> }
>>> }
>>> return session.getPendingErrors();-> Once,
>>> the connection is lost, this part of the code never gets called and the
>>> Spark 

Re: Spark Streaming + Kudu

2018-03-05 Thread Mike Percy
Have you considered checking your session error count or pending errors in
your while loop every so often? Can you identify where your code is hanging
when the connection is lost (what line)?

Mike

On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth  wrote:

> In addition to my previous comment, I raised a support ticket for this
> issue with Cloudera and one of the support person mentioned below,
>
> *"Thank you for clarifying, The exceptions are logged but not re-thrown to
> an upper layer, so that explains why the Spark application is not aware of
> the underlying error."*
>
> On 5 March 2018 at 21:02, Ravi Kanth  wrote:
>
>> Mike,
>>
>> Thanks for the information. But, once the connection to any of the Kudu
>> servers is lost then there is no way I can have a control on the
>> KuduSession object and so with getPendingErrors(). The KuduClient in this
>> case is becoming a zombie and never returned back till the connection is
>> properly established. I tried doing all that you have suggested with no
>> luck. Attaching my KuduClient code.
>>
>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>
>> import java.util.HashMap;
>> import java.util.Iterator;
>> import java.util.Map;
>> import org.apache.hadoop.util.ShutdownHookManager;
>> import org.apache.kudu.client.*;
>> import org.apache.spark.api.java.JavaRDD;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>> ullConstants;
>>
>> public class KuduProcess {
>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>> ocess.class);
>> private KuduTable table;
>> private KuduSession session;
>>
>> public static void upsertKudu(JavaRDD> rdd, String
>> host, String tableName) {
>> rdd.foreachPartition(iterator -> {
>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, tableName,
>> host);
>> int errorCount = errors.getRowErrors().length;
>> if(errorCount > 0){
>> throw new RuntimeException("Failed to write " + errorCount + " messages
>> into Kudu");
>> }
>> });
>> }
>> private static RowErrorsAndOverflowStatus 
>> upsertOpIterator(Iterator> Object>> iter, String tableName, String host) {
>> try {
>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
>> KuduClient client = asyncClient.syncClient();
>> table = client.openTable(tableName);
>> session = client.newSession();
>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>> SH_BACKGROUND);
>> while (iter.hasNext()) {
>> upsertOp(iter.next());
>> }
>> } catch (KuduException e) {
>> logger.error("Exception in upsertOpIterator method", e);
>> }
>> finally{
>> try {
>> session.close();
>> } catch (KuduException e) {
>> logger.error("Exception in Connection close", e);
>> }
>> }
>> return session.getPendingErrors();-> Once,
>> the connection is lost, this part of the code never gets called and the
>> Spark job will keep on running and processing the records while the
>> KuduClient is trying to connect to Kudu. Meanwhile, we are loosing all the
>> records.
>> }
>> public static void upsertOp(Map formattedMap) {
>> if (formattedMap.size() != 0) {
>> try {
>> Upsert upsert = table.newUpsert();
>> PartialRow row = upsert.getRow();
>> for (Map.Entry entry : formattedMap.entrySet()) {
>> if (entry.getValue().getClass().equals(String.class)) {
>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
>> row.setNull(entry.getKey());
>> else
>> row.addString(entry.getKey(), (String) entry.getValue());
>> } else if (entry.getValue().getClass().equals(Long.class)) {
>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
>> row.setNull(entry.getKey());
>> else
>> row.addLong(entry.getKey(), (Long) entry.getValue());
>> } else if (entry.getValue().getClass().equals(Integer.class)) {
>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
>> row.setNull(entry.getKey());
>> else
>> row.addInt(entry.getKey(), (Integer) entry.getValue());
>> }
>> }
>>
>> session.apply(upsert);
>> } catch (Exception e) {
>> logger.error("Exception during upsert:", e);
>> }
>> }
>> }
>> }
>> class KuduConnection {
>> private static Logger logger = LoggerFactory.getLogger(KuduCo
>> nnection.class);
>> private static Map asyncCache = new HashMap<>();
>> private static int ShutdownHookPriority = 100;
>>
>> static AsyncKuduClient getAsyncClient(String kuduMaster) {
>> if (!asyncCache.containsKey(kuduMaster)) {
>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien
>> tBuilder(kuduMaster).build();
>> ShutdownHookManager.get().addShutdownHook(new Runnable() {
>> @Override
>> public void run() {
>> try {
>> asyncClient.close();
>> } catch (Exception e) {
>> logger.error("Exception closing async client", e);
>> }
>> }
>> }, ShutdownHookPriority);
>> asyncCache.put(kuduMaster, asyncClient);
>> }
>> return 

Re: Spark Streaming + Kudu

2018-03-05 Thread Ravi Kanth
In addition to my previous comment, I raised a support ticket for this
issue with Cloudera and one of the support person mentioned below,

*"Thank you for clarifying, The exceptions are logged but not re-thrown to
an upper layer, so that explains why the Spark application is not aware of
the underlying error."*

On 5 March 2018 at 21:02, Ravi Kanth  wrote:

> Mike,
>
> Thanks for the information. But, once the connection to any of the Kudu
> servers is lost then there is no way I can have a control on the
> KuduSession object and so with getPendingErrors(). The KuduClient in this
> case is becoming a zombie and never returned back till the connection is
> properly established. I tried doing all that you have suggested with no
> luck. Attaching my KuduClient code.
>
> package org.dwh.streaming.kudu.sparkkudustreaming;
>
> import java.util.HashMap;
> import java.util.Iterator;
> import java.util.Map;
> import org.apache.hadoop.util.ShutdownHookManager;
> import org.apache.kudu.client.*;
> import org.apache.spark.api.java.JavaRDD;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> import org.dwh.streaming.kudu.sparkkudustreaming.constants.
> SpecialNullConstants;
>
> public class KuduProcess {
> private static Logger logger = LoggerFactory.getLogger(KuduProcess.class);
> private KuduTable table;
> private KuduSession session;
>
> public static void upsertKudu(JavaRDD> rdd, String
> host, String tableName) {
> rdd.foreachPartition(iterator -> {
> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, tableName,
> host);
> int errorCount = errors.getRowErrors().length;
> if(errorCount > 0){
> throw new RuntimeException("Failed to write " + errorCount + " messages
> into Kudu");
> }
> });
> }
> private static RowErrorsAndOverflowStatus 
> upsertOpIterator(Iterator Object>> iter, String tableName, String host) {
> try {
> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
> KuduClient client = asyncClient.syncClient();
> table = client.openTable(tableName);
> session = client.newSession();
> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_
> BACKGROUND);
> while (iter.hasNext()) {
> upsertOp(iter.next());
> }
> } catch (KuduException e) {
> logger.error("Exception in upsertOpIterator method", e);
> }
> finally{
> try {
> session.close();
> } catch (KuduException e) {
> logger.error("Exception in Connection close", e);
> }
> }
> return session.getPendingErrors();-> Once,
> the connection is lost, this part of the code never gets called and the
> Spark job will keep on running and processing the records while the
> KuduClient is trying to connect to Kudu. Meanwhile, we are loosing all the
> records.
> }
> public static void upsertOp(Map formattedMap) {
> if (formattedMap.size() != 0) {
> try {
> Upsert upsert = table.newUpsert();
> PartialRow row = upsert.getRow();
> for (Map.Entry entry : formattedMap.entrySet()) {
> if (entry.getValue().getClass().equals(String.class)) {
> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
> row.setNull(entry.getKey());
> else
> row.addString(entry.getKey(), (String) entry.getValue());
> } else if (entry.getValue().getClass().equals(Long.class)) {
> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
> row.setNull(entry.getKey());
> else
> row.addLong(entry.getKey(), (Long) entry.getValue());
> } else if (entry.getValue().getClass().equals(Integer.class)) {
> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
> row.setNull(entry.getKey());
> else
> row.addInt(entry.getKey(), (Integer) entry.getValue());
> }
> }
>
> session.apply(upsert);
> } catch (Exception e) {
> logger.error("Exception during upsert:", e);
> }
> }
> }
> }
> class KuduConnection {
> private static Logger logger = LoggerFactory.getLogger(
> KuduConnection.class);
> private static Map asyncCache = new HashMap<>();
> private static int ShutdownHookPriority = 100;
>
> static AsyncKuduClient getAsyncClient(String kuduMaster) {
> if (!asyncCache.containsKey(kuduMaster)) {
> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(
> kuduMaster).build();
> ShutdownHookManager.get().addShutdownHook(new Runnable() {
> @Override
> public void run() {
> try {
> asyncClient.close();
> } catch (Exception e) {
> logger.error("Exception closing async client", e);
> }
> }
> }, ShutdownHookPriority);
> asyncCache.put(kuduMaster, asyncClient);
> }
> return asyncCache.get(kuduMaster);
> }
> }
>
>
>
> Thanks,
> Ravi
>
> On 5 March 2018 at 16:20, Mike Percy  wrote:
>
>> Hi Ravi, it would be helpful if you could attach what you are getting
>> back from getPendingErrors() -- perhaps from dumping RowError.toString()
>> from items in the returned array -- and indicate what you were hoping to
>> get back. Note that a RowError can also return to you the Operation

Re: Spark Streaming + Kudu

2018-03-05 Thread Ravi Kanth
Mike,

Thanks for the information. But, once the connection to any of the Kudu
servers is lost then there is no way I can have a control on the
KuduSession object and so with getPendingErrors(). The KuduClient in this
case is becoming a zombie and never returned back till the connection is
properly established. I tried doing all that you have suggested with no
luck. Attaching my KuduClient code.

package org.dwh.streaming.kudu.sparkkudustreaming;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.kudu.client.*;
import org.apache.spark.api.java.JavaRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import
org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialNullConstants;

public class KuduProcess {
private static Logger logger = LoggerFactory.getLogger(KuduProcess.class);
private KuduTable table;
private KuduSession session;

public static void upsertKudu(JavaRDD> rdd, String
host, String tableName) {
rdd.foreachPartition(iterator -> {
RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, tableName,
host);
int errorCount = errors.getRowErrors().length;
if(errorCount > 0){
throw new RuntimeException("Failed to write " + errorCount + " messages
into Kudu");
}
});
}
private static RowErrorsAndOverflowStatus
upsertOpIterator(Iterator> iter, String tableName,
String host) {
try {
AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
KuduClient client = asyncClient.syncClient();
table = client.openTable(tableName);
session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
while (iter.hasNext()) {
upsertOp(iter.next());
}
} catch (KuduException e) {
logger.error("Exception in upsertOpIterator method", e);
}
finally{
try {
session.close();
} catch (KuduException e) {
logger.error("Exception in Connection close", e);
}
}
return session.getPendingErrors();-> Once, the
connection is lost, this part of the code never gets called and the Spark
job will keep on running and processing the records while the KuduClient is
trying to connect to Kudu. Meanwhile, we are loosing all the records.
}
public static void upsertOp(Map formattedMap) {
if (formattedMap.size() != 0) {
try {
Upsert upsert = table.newUpsert();
PartialRow row = upsert.getRow();
for (Map.Entry entry : formattedMap.entrySet()) {
if (entry.getValue().getClass().equals(String.class)) {
if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
row.setNull(entry.getKey());
else
row.addString(entry.getKey(), (String) entry.getValue());
} else if (entry.getValue().getClass().equals(Long.class)) {
if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
row.setNull(entry.getKey());
else
row.addLong(entry.getKey(), (Long) entry.getValue());
} else if (entry.getValue().getClass().equals(Integer.class)) {
if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
row.setNull(entry.getKey());
else
row.addInt(entry.getKey(), (Integer) entry.getValue());
}
}

session.apply(upsert);
} catch (Exception e) {
logger.error("Exception during upsert:", e);
}
}
}
}
class KuduConnection {
private static Logger logger =
LoggerFactory.getLogger(KuduConnection.class);
private static Map asyncCache = new HashMap<>();
private static int ShutdownHookPriority = 100;

static AsyncKuduClient getAsyncClient(String kuduMaster) {
if (!asyncCache.containsKey(kuduMaster)) {
AsyncKuduClient asyncClient = new
AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build();
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
try {
asyncClient.close();
} catch (Exception e) {
logger.error("Exception closing async client", e);
}
}
}, ShutdownHookPriority);
asyncCache.put(kuduMaster, asyncClient);
}
return asyncCache.get(kuduMaster);
}
}



Thanks,
Ravi

On 5 March 2018 at 16:20, Mike Percy  wrote:

> Hi Ravi, it would be helpful if you could attach what you are getting back
> from getPendingErrors() -- perhaps from dumping RowError.toString() from
> items in the returned array -- and indicate what you were hoping to get
> back. Note that a RowError can also return to you the Operation
> 
> that you used to generate the write. From the Operation, you can get the
> original PartialRow
> 
> object, which should be able to identify the affected row that the write
> failed for. Does that help?
>
> Since you are using the Kudu client directly, Spark is not involved from
> the Kudu perspective, so you will need to deal with Spark on your own in
> that case.
>
> Mike
>
> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth 
> 

Re: Spark Streaming + Kudu

2018-03-05 Thread Mike Percy
Hi Ravi, it would be helpful if you could attach what you are getting back
from getPendingErrors() -- perhaps from dumping RowError.toString() from
items in the returned array -- and indicate what you were hoping to get
back. Note that a RowError can also return to you the Operation

that you used to generate the write. From the Operation, you can get the
original PartialRow

object, which should be able to identify the affected row that the write
failed for. Does that help?

Since you are using the Kudu client directly, Spark is not involved from
the Kudu perspective, so you will need to deal with Spark on your own in
that case.

Mike

On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth  wrote:

> Hi Mike,
>
> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>
> So, I am trying to use Kudu Client API to perform UPSERT into Kudu and I
> integrated this with Spark. I am trying to test a case where in if any of
> Kudu server fails. So, in this case, if there is any problem in writing,
> getPendingErrors() should give me a way to handle these errors so that I
> can successfully terminate my Spark Job. This is what I am trying to do.
>
> But, I am not able to get a hold of the exceptions being thrown from with
> in the KuduClient when retrying to connect to Tablet Server. My
> getPendingErrors is not getting ahold of these exceptions.
>
> Let me know if you need more clarification. I can post some Snippets.
>
> Thanks,
> Ravi
>
> On 5 March 2018 at 13:18, Mike Percy  wrote:
>
>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
>> ?
>> You mention that you are trying to use getPendingErrors()
>> 
>>  but
>> it sounds like it's not working for you -- can you be more specific about
>> what you expect and what you are observing?
>>
>> Thanks,
>> Mike
>>
>>
>>
>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth 
>> wrote:
>>
>>> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
>>> any issues in production and we are not losing tablet servers. But, as part
>>> of testing I have to generate few unforeseen cases to analyse the
>>> application performance. One among that is bringing down the tablet server
>>> or master server intentionally during which I observed the loss of records.
>>> Just wanted to test cases out of the happy path here. Once again thanks for
>>> taking time to respond to me.
>>>
>>> - Ravi
>>>
>>> On 26 February 2018 at 19:58, Clifford Resnick 
>>> wrote:
>>>
 I'll have to get back to you on the code bits, but I'm pretty sure
 we're doing simple sync batching. We're not in production yet, but after
 some months of development I haven't seen any failures, even when pushing
 load doing multiple years' backfill. I think the real question is why are
 you losing tablet servers? The only instability we ever had with Kudu was
 when it had that weird ntp sync issue that was fixed I think for 1.6. What
 version are you running?

 Anyway I would think that infinite loop should be catchable somewhere.
 Our pipeline is set to fail/retry with Flink snapshots. I imagine there is
 similar with Spark. Sorry I cant be of more help!



 On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:

 Cliff,

 Thanks for the response. Well, I do agree that its simple and seamless.
 In my case, I am able to upsert ~25000 events/sec into Kudu. But, I am
 facing the problem when any of the Kudu Tablet or master server is down. I
 am not able to get a hold of the exception from client. The client is going
 into an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
 records. I tried handling the errors through getPendingErrors() but still
 it is helpless. I am using AsyncKuduClient to establish the connection and
 retrieving the syncClient from the Async to open the session and table. Any
 help?

 Thanks,
 Ravi

 On 26 February 2018 at 18:00, Cliff Resnick  wrote:

 While I can't speak for Spark, we do use the client API from Flink
 streaming and it's simple and seamless. It's especially nice if you require
 an Upsert semantic.

 On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:

 Hi,

 Anyone using Spark Streaming to ingest data into Kudu and using Kudu
 Client API to do so rather than the traditional KuduContext API? I am stuck
 at a point and couldn't find a solution.

 

Re: Spark Streaming + Kudu

2018-03-05 Thread Ravi Kanth
Hi Mike,

Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.

So, I am trying to use Kudu Client API to perform UPSERT into Kudu and I
integrated this with Spark. I am trying to test a case where in if any of
Kudu server fails. So, in this case, if there is any problem in writing,
getPendingErrors() should give me a way to handle these errors so that I
can successfully terminate my Spark Job. This is what I am trying to do.

But, I am not able to get a hold of the exceptions being thrown from with
in the KuduClient when retrying to connect to Tablet Server. My
getPendingErrors is not getting ahold of these exceptions.

Let me know if you need more clarification. I can post some Snippets.

Thanks,
Ravi

On 5 March 2018 at 13:18, Mike Percy  wrote:

> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
> ?
> You mention that you are trying to use getPendingErrors()
> 
>  but
> it sounds like it's not working for you -- can you be more specific about
> what you expect and what you are observing?
>
> Thanks,
> Mike
>
>
>
> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth 
> wrote:
>
>> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
>> any issues in production and we are not losing tablet servers. But, as part
>> of testing I have to generate few unforeseen cases to analyse the
>> application performance. One among that is bringing down the tablet server
>> or master server intentionally during which I observed the loss of records.
>> Just wanted to test cases out of the happy path here. Once again thanks for
>> taking time to respond to me.
>>
>> - Ravi
>>
>> On 26 February 2018 at 19:58, Clifford Resnick 
>> wrote:
>>
>>> I'll have to get back to you on the code bits, but I'm pretty sure we're
>>> doing simple sync batching. We're not in production yet, but after some
>>> months of development I haven't seen any failures, even when pushing load
>>> doing multiple years' backfill. I think the real question is why are you
>>> losing tablet servers? The only instability we ever had with Kudu was when
>>> it had that weird ntp sync issue that was fixed I think for 1.6. What
>>> version are you running?
>>>
>>> Anyway I would think that infinite loop should be catchable somewhere.
>>> Our pipeline is set to fail/retry with Flink snapshots. I imagine there is
>>> similar with Spark. Sorry I cant be of more help!
>>>
>>>
>>>
>>> On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:
>>>
>>> Cliff,
>>>
>>> Thanks for the response. Well, I do agree that its simple and seamless.
>>> In my case, I am able to upsert ~25000 events/sec into Kudu. But, I am
>>> facing the problem when any of the Kudu Tablet or master server is down. I
>>> am not able to get a hold of the exception from client. The client is going
>>> into an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
>>> records. I tried handling the errors through getPendingErrors() but still
>>> it is helpless. I am using AsyncKuduClient to establish the connection and
>>> retrieving the syncClient from the Async to open the session and table. Any
>>> help?
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 26 February 2018 at 18:00, Cliff Resnick  wrote:
>>>
>>> While I can't speak for Spark, we do use the client API from Flink
>>> streaming and it's simple and seamless. It's especially nice if you require
>>> an Upsert semantic.
>>>
>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:
>>>
>>> Hi,
>>>
>>> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
>>> Client API to do so rather than the traditional KuduContext API? I am stuck
>>> at a point and couldn't find a solution.
>>>
>>> Thanks,
>>> Ravi
>>>
>>>
>>>
>>>
>>
>


Re: Spark Streaming + Kudu

2018-03-05 Thread Mike Percy
Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
?
You mention that you are trying to use getPendingErrors()

but
it sounds like it's not working for you -- can you be more specific about
what you expect and what you are observing?

Thanks,
Mike



On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth  wrote:

> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
> any issues in production and we are not losing tablet servers. But, as part
> of testing I have to generate few unforeseen cases to analyse the
> application performance. One among that is bringing down the tablet server
> or master server intentionally during which I observed the loss of records.
> Just wanted to test cases out of the happy path here. Once again thanks for
> taking time to respond to me.
>
> - Ravi
>
> On 26 February 2018 at 19:58, Clifford Resnick 
> wrote:
>
>> I'll have to get back to you on the code bits, but I'm pretty sure we're
>> doing simple sync batching. We're not in production yet, but after some
>> months of development I haven't seen any failures, even when pushing load
>> doing multiple years' backfill. I think the real question is why are you
>> losing tablet servers? The only instability we ever had with Kudu was when
>> it had that weird ntp sync issue that was fixed I think for 1.6. What
>> version are you running?
>>
>> Anyway I would think that infinite loop should be catchable somewhere.
>> Our pipeline is set to fail/retry with Flink snapshots. I imagine there is
>> similar with Spark. Sorry I cant be of more help!
>>
>>
>>
>> On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:
>>
>> Cliff,
>>
>> Thanks for the response. Well, I do agree that its simple and seamless.
>> In my case, I am able to upsert ~25000 events/sec into Kudu. But, I am
>> facing the problem when any of the Kudu Tablet or master server is down. I
>> am not able to get a hold of the exception from client. The client is going
>> into an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
>> records. I tried handling the errors through getPendingErrors() but still
>> it is helpless. I am using AsyncKuduClient to establish the connection and
>> retrieving the syncClient from the Async to open the session and table. Any
>> help?
>>
>> Thanks,
>> Ravi
>>
>> On 26 February 2018 at 18:00, Cliff Resnick  wrote:
>>
>> While I can't speak for Spark, we do use the client API from Flink
>> streaming and it's simple and seamless. It's especially nice if you require
>> an Upsert semantic.
>>
>> On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:
>>
>> Hi,
>>
>> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
>> Client API to do so rather than the traditional KuduContext API? I am stuck
>> at a point and couldn't find a solution.
>>
>> Thanks,
>> Ravi
>>
>>
>>
>>
>


Re: Spark Streaming + Kudu

2018-02-26 Thread Ravi Kanth
Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
any issues in production and we are not losing tablet servers. But, as part
of testing I have to generate few unforeseen cases to analyse the
application performance. One among that is bringing down the tablet server
or master server intentionally during which I observed the loss of records.
Just wanted to test cases out of the happy path here. Once again thanks for
taking time to respond to me.

- Ravi

On 26 February 2018 at 19:58, Clifford Resnick 
wrote:

> I'll have to get back to you on the code bits, but I'm pretty sure we're
> doing simple sync batching. We're not in production yet, but after some
> months of development I haven't seen any failures, even when pushing load
> doing multiple years' backfill. I think the real question is why are you
> losing tablet servers? The only instability we ever had with Kudu was when
> it had that weird ntp sync issue that was fixed I think for 1.6. What
> version are you running?
>
> Anyway I would think that infinite loop should be catchable somewhere. Our
> pipeline is set to fail/retry with Flink snapshots. I imagine there is
> similar with Spark. Sorry I cant be of more help!
>
>
>
> On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:
>
> Cliff,
>
> Thanks for the response. Well, I do agree that its simple and seamless. In
> my case, I am able to upsert ~25000 events/sec into Kudu. But, I am facing
> the problem when any of the Kudu Tablet or master server is down. I am not
> able to get a hold of the exception from client. The client is going into
> an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
> records. I tried handling the errors through getPendingErrors() but still
> it is helpless. I am using AsyncKuduClient to establish the connection and
> retrieving the syncClient from the Async to open the session and table. Any
> help?
>
> Thanks,
> Ravi
>
> On 26 February 2018 at 18:00, Cliff Resnick  wrote:
>
> While I can't speak for Spark, we do use the client API from Flink
> streaming and it's simple and seamless. It's especially nice if you require
> an Upsert semantic.
>
> On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:
>
> Hi,
>
> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
> Client API to do so rather than the traditional KuduContext API? I am stuck
> at a point and couldn't find a solution.
>
> Thanks,
> Ravi
>
>
>
>


Re: Spark on Kudu Roadmap

2017-04-09 Thread Benjamin Kim
Hi Mike,

Thanks for the link. I guess further, deeper Spark integration is slowly 
coming. But when, we will have to wait and see.

Cheers,
Ben
 

> On Mar 27, 2017, at 12:25 PM, Mike Percy  wrote:
> 
> Hi Ben,
> I don't really know so I'll let someone else more familiar with the Spark 
> integration chime in on that. However I searched the Kudu JIRA and I don't 
> see a tracking ticket filed on this (the closest thing I could find was 
> https://issues.apache.org/jira/browse/KUDU-1676 
>  ) so you may want to file a 
> JIRA to help track this feature.
> 
> Mike
> 
> 
> On Mon, Mar 27, 2017 at 11:55 AM, Benjamin Kim  > wrote:
> Hi Mike,
> 
> I believe what we are looking for is this below. It is an often request use 
> case.
> 
> Anyone know if the Spark package will ever allow for creating tables in Spark 
> SQL?
> 
> Such as:
>CREATE EXTERNAL TABLE 
>USING org.apache.kudu.spark.kudu
>OPTIONS (Map("kudu.master" -> “", "kudu.table" -> 
> “table-name”));
> 
> In this way, plain SQL can be used to do DDL, DML statements whether in Spark 
> SQL code or using JDBC to interface with Spark SQL Thriftserver.
> 
> Thanks,
> Ben
> 
> 
> 
>> On Mar 27, 2017, at 11:01 AM, Mike Percy > > wrote:
>> 
>> Hi Ben,
>> Is there anything in particular you are looking for?
>> 
>> Thanks,
>> Mike
>> 
>> On Mon, Mar 27, 2017 at 9:48 AM, Benjamin Kim > > wrote:
>> Hi,
>> 
>> Are there any plans for deeper integration with Spark especially Spark SQL? 
>> Is there a roadmap to look at, so I can know what to expect in the future?
>> 
>> Cheers,
>> Ben
>> 
> 
> 



Re: Spark on Kudu Roadmap

2017-03-27 Thread Benjamin Kim
Hi Mike,

I believe what we are looking for is this below. It is an often request use 
case.

Anyone know if the Spark package will ever allow for creating tables in Spark 
SQL?

Such as:
   CREATE EXTERNAL TABLE 
   USING org.apache.kudu.spark.kudu
   OPTIONS (Map("kudu.master" -> “", "kudu.table" -> 
“table-name”));

In this way, plain SQL can be used to do DDL, DML statements whether in Spark 
SQL code or using JDBC to interface with Spark SQL Thriftserver.

Thanks,
Ben


> On Mar 27, 2017, at 11:01 AM, Mike Percy  wrote:
> 
> Hi Ben,
> Is there anything in particular you are looking for?
> 
> Thanks,
> Mike
> 
> On Mon, Mar 27, 2017 at 9:48 AM, Benjamin Kim  > wrote:
> Hi,
> 
> Are there any plans for deeper integration with Spark especially Spark SQL? 
> Is there a roadmap to look at, so I can know what to expect in the future?
> 
> Cheers,
> Ben
> 



Re: Spark on Kudu Roadmap

2017-03-27 Thread Mike Percy
Hi Ben,
Is there anything in particular you are looking for?

Thanks,
Mike

On Mon, Mar 27, 2017 at 9:48 AM, Benjamin Kim  wrote:

> Hi,
>
> Are there any plans for deeper integration with Spark especially Spark
> SQL? Is there a roadmap to look at, so I can know what to expect in the
> future?
>
> Cheers,
> Ben


Re: Spark on Kudu

2016-10-10 Thread Mark Hamstra
I realize that the Spark on Kudu work to date has been based on Spark 1.6,
where your statement about Spark SQL relying on Hive is true.  In Spark
2.0, however, that dependency no longer exists since Spark SQL essentially
copied over the parts of Hive that were needed into Spark itself, and has
been free to diverge since then.

On Mon, Oct 10, 2016 at 4:11 PM, Dan Burkert  wrote:

> Hi Ben,
>
> SparkSQL relies on Hive for DDL statements, so having support for this
> requires adding support to Hive for manipulating Kudu tables.  This is
> something that we would like to do in the long term, but there are no
> concrete plans (that I know of) to make it happen in the near term.
>
> - Dan
>
> On Thu, Oct 6, 2016 at 4:38 PM, Benjamin Kim  wrote:
>
>> Anyone know if the Spark package will ever allow for creating tables in
>> Spark SQL?
>>
>> Such as:
>>CREATE EXTERNAL TABLE 
>>USING org.apache.kudu.spark.kudu
>>OPTIONS (Map("kudu.master" -> “", "kudu.table" ->
>> “table-name”));
>>
>> In this way, plain SQL can be used to do DDL, DML statements whether in
>> Spark SQL code or using JDBC to interface with Spark SQL Thriftserver.
>>
>> By the way, we are trying to create a DMP in Kudu with the a farm of
>> RESTful Endpoints to do cookie sync, ad serving, segmentation data
>> exchange. And, the Spark compute cluster and the Kudu cluster will reside
>> on the same racks in the same datacenter.
>>
>> Thanks,
>> Ben
>>
>> On Sep 20, 2016, at 3:02 PM, Jordan Birdsell 
>> wrote:
>>
>> http://kudu.apache.org/docs/developing.html#_kudu_integration_with_spark
>>
>> On Tue, Sep 20, 2016 at 5:00 PM Benjamin Kim  wrote:
>>
>>> I see that the API has changed a bit so my old code doesn’t work
>>> anymore. Can someone direct me to some code samples?
>>>
>>> Thanks,
>>> Ben
>>>
>>>
>>> On Sep 20, 2016, at 1:44 PM, Todd Lipcon  wrote:
>>>
>>> On Tue, Sep 20, 2016 at 1:18 PM, Benjamin Kim  wrote
>>> :
>>>
 Now that Kudu 1.0.0 is officially out and ready for production use,
 where do we find the spark connector jar for this release?


>>> It's available in the official ASF maven repository:
>>> https://repository.apache.org/#nexus-search;quick~kudu-spark
>>>
>>> 
>>>   org.apache.kudu
>>>   kudu-spark_2.10
>>>   1.0.0
>>> 
>>>
>>>
>>> -Todd
>>>
>>>
>>>
 On Jun 17, 2016, at 11:08 AM, Dan Burkert  wrote:

 Hi Ben,

 To your first question about `CREATE TABLE` syntax with Kudu/Spark SQL,
 I do not think we support that at this point.  I haven't looked deeply into
 it, but we may hit issues specifying Kudu-specific options (partitioning,
 column encoding, etc.).  Probably issues that can be worked through
 eventually, though.  If you are interested in contributing to Kudu, this is
 an area that could obviously use improvement!  Most or all of our Spark
 features have been completely community driven to date.


> I am assuming that more Spark support along with semantic changes
> below will be incorporated into Kudu 0.9.1.
>

 As a rule we do not release new features in patch releases, but the
 good news is that we are releasing regularly, and our next scheduled
 release is for the August timeframe (see JD's roadmap
 
  email
 about what we are aiming to include).  Also, Cloudera does publish snapshot
 versions of the Spark connector here
 ,
 so the jars are available if you don't mind using snapshots.


> Anyone know of a better way to make unique primary keys other than
> using UUID to make every row unique if there is no unique column (or
> combination thereof) to use.
>

 Not that I know of.  In general it's pretty rare to have a dataset
 without a natural primary key (even if it's just all of the columns), but
 in those cases UUID is a good solution.


> This is what I am using. I know auto incrementing is coming down the
> line (don’t know when), but is there a way to simulate this in Kudu using
> Spark out of curiosity?
>

 To my knowledge there is no plan to have auto increment in Kudu.
 Distributed, consistent, auto incrementing counters is a difficult problem,
 and I don't think there are any known solutions that would be fast enough
 for Kudu (happy to be proven wrong, though!).

 - Dan


>
> Thanks,
> Ben
>
> On Jun 14, 2016, at 6:08 PM, Dan Burkert  wrote:
>
> I'm not sure exactly what the semantics will be, but at least one of
> them will be upsert.  These modes come from spark, and they were 

Re: Spark on Kudu

2016-09-20 Thread Benjamin Kim
Thanks!

> On Sep 20, 2016, at 3:02 PM, Jordan Birdsell  
> wrote:
> 
> http://kudu.apache.org/docs/developing.html#_kudu_integration_with_spark 
> 
> 
> On Tue, Sep 20, 2016 at 5:00 PM Benjamin Kim  > wrote:
> I see that the API has changed a bit so my old code doesn’t work anymore. Can 
> someone direct me to some code samples?
> 
> Thanks,
> Ben
> 
> 
>> On Sep 20, 2016, at 1:44 PM, Todd Lipcon > > wrote:
>> 
>> On Tue, Sep 20, 2016 at 1:18 PM, Benjamin Kim > > wrote:
>> Now that Kudu 1.0.0 is officially out and ready for production use, where do 
>> we find the spark connector jar for this release?
>> 
>> 
>> It's available in the official ASF maven repository:  
>> https://repository.apache.org/#nexus-search;quick~kudu-spark 
>> 
>> 
>> 
>>   org.apache.kudu
>>   kudu-spark_2.10
>>   1.0.0
>> 
>> 
>> 
>> -Todd
>>  
>> 
>> 
>>> On Jun 17, 2016, at 11:08 AM, Dan Burkert >> > wrote:
>>> 
>>> Hi Ben,
>>> 
>>> To your first question about `CREATE TABLE` syntax with Kudu/Spark SQL, I 
>>> do not think we support that at this point.  I haven't looked deeply into 
>>> it, but we may hit issues specifying Kudu-specific options (partitioning, 
>>> column encoding, etc.).  Probably issues that can be worked through 
>>> eventually, though.  If you are interested in contributing to Kudu, this is 
>>> an area that could obviously use improvement!  Most or all of our Spark 
>>> features have been completely community driven to date.
>>>  
>>> I am assuming that more Spark support along with semantic changes below 
>>> will be incorporated into Kudu 0.9.1.
>>> 
>>> As a rule we do not release new features in patch releases, but the good 
>>> news is that we are releasing regularly, and our next scheduled release is 
>>> for the August timeframe (see JD's roadmap 
>>> 
>>>  email about what we are aiming to include).  Also, Cloudera does publish 
>>> snapshot versions of the Spark connector here 
>>> , so 
>>> the jars are available if you don't mind using snapshots.
>>>  
>>> Anyone know of a better way to make unique primary keys other than using 
>>> UUID to make every row unique if there is no unique column (or combination 
>>> thereof) to use.
>>> 
>>> Not that I know of.  In general it's pretty rare to have a dataset without 
>>> a natural primary key (even if it's just all of the columns), but in those 
>>> cases UUID is a good solution.
>>>  
>>> This is what I am using. I know auto incrementing is coming down the line 
>>> (don’t know when), but is there a way to simulate this in Kudu using Spark 
>>> out of curiosity?
>>> 
>>> To my knowledge there is no plan to have auto increment in Kudu.  
>>> Distributed, consistent, auto incrementing counters is a difficult problem, 
>>> and I don't think there are any known solutions that would be fast enough 
>>> for Kudu (happy to be proven wrong, though!).
>>> 
>>> - Dan
>>>  
>>> 
>>> Thanks,
>>> Ben
>>> 
 On Jun 14, 2016, at 6:08 PM, Dan Burkert > wrote:
 
 I'm not sure exactly what the semantics will be, but at least one of them 
 will be upsert.  These modes come from spark, and they were really 
 designed for file-backed storage and not table storage.  We may want to do 
 append = upsert, and overwrite = truncate + insert.  I think that may 
 match the normal spark semantics more closely.
 
 - Dan
 
 On Tue, Jun 14, 2016 at 6:00 PM, Benjamin Kim > wrote:
 Dan,
 
 Thanks for the information. That would mean both “append” and “overwrite” 
 modes would be combined or not needed in the future.
 
 Cheers,
 Ben
 
> On Jun 14, 2016, at 5:57 PM, Dan Burkert  > wrote:
> 
> Right now append uses an update Kudu operation, which requires the row 
> already be present in the table. Overwrite maps to insert.  Kudu very 
> recently got upsert support baked in, but it hasn't yet been integrated 
> into the Spark connector.  So pretty soon these sharp edges will get a 
> lot better, since upsert is the way to go for most spark workloads.
> 
> - Dan
> 
> On Tue, Jun 14, 2016 at 5:41 PM, Benjamin Kim  > wrote:
> I tried to use the “append” mode, and it worked. Over 3.8 million rows in 
> 64s. I would