Re: spark on kudu performance!

2018-07-05 Thread Todd Lipcon
On Mon, Jun 11, 2018 at 5:52 AM, fengba...@uce.cn  wrote:

> Hi:
>
>  I use kudu official website development documents, use
> spark analysis kudu data(kudu's version is 1.6.0):
>
> the official  code is :
> *val df = sqlContext.read.options(Map("kudu.master" ->
> "kudu.master:7051","kudu.table" -> "kudu_table")).kudu // Query using the
> Spark API... df.select("id").filter("id" >= 5).show()*
>
>
> My question  is :
> (1)If I use the official website code, when creating
> data collection of df, the data of my table is about 1.8
> billion, and then the filter of df is performed. This is
> equivalent to loading 1.8 billion data into memory each
> time, and the performance is very poor.
>

That's not correct. Data frames are lazy-evaluated, so when you use a
filter like the above, it does not fully materialize the whole data frame
into memory before it begins to filter.

You can also use ".explain()" to see whether the filter you are specifying
is getting pushed down properly to Kudu.


>
> (2)Create a time-based range partition on the 1.8 billion
> table, and then directly use the underlying java api,scan
> partition to analyze, this is not the amount of data each
> time loading is the specified number of partitions instead
> of 1.8 billion data?
>
> Please give me some suggestions, thanks!
>
>
The above should happen automatically so long as the filter predicate has
been pushed down. Using 'explain()' and showing us the results, along with
the code you used to create your table, will help understand what might be
the problem with performance.

-Todd
--
Todd Lipcon
Software Engineer, Cloudera


spark on kudu performance!

2018-06-11 Thread fengba...@uce.cn
Hi:

 I use kudu official website development documents, use spark analysis kudu 
data(kudu's version is 1.6.0):

the official  code is :
val df = sqlContext.read.options(Map("kudu.master" -> 
"kudu.master:7051","kudu.table" -> "kudu_table")).kudu // Query using the Spark 
API... df.select("id").filter("id" >= 5).show()


My question  is :
(1)If I use the official website code, when creating data collection of df, the 
data of my table is about 1.8 billion, and then the filter of df is performed. 
This is equivalent to loading 1.8 billion data into memory each time, and the 
performance is very poor.

(2)Create a time-based range partition on the 1.8 billion table, and then 
directly use the underlying java api,scan partition to analyze, this is not the 
amount of data each time loading is the specified number of partitions instead 
of 1.8 billion data?

Please give me some suggestions, thanks!



优速物流有限公司
大数据中心 冯宝利
Mobil:15050552430
Email:fengba...@uce.cn


Re: Spark Streaming + Kudu

2018-03-06 Thread Ravi Kanth
ormattedMap.entrySet()) {
>>>>>>> if (entry.getValue().getClass().equals(String.class)) {
>>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
>>>>>>> row.setNull(entry.getKey());
>>>>>>> else
>>>>>>> row.addString(entry.getKey(), (String) entry.getValue());
>>>>>>> } else if (entry.getValue().getClass().equals(Long.class)) {
>>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
>>>>>>> row.setNull(entry.getKey());
>>>>>>> else
>>>>>>> row.addLong(entry.getKey(), (Long) entry.getValue());
>>>>>>> } else if (entry.getValue().getClass().equals(Integer.class)) {
>>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
>>>>>>> row.setNull(entry.getKey());
>>>>>>> else
>>>>>>> row.addInt(entry.getKey(), (Integer) entry.getValue());
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>> session.apply(upsert);
>>>>>>> } catch (Exception e) {
>>>>>>> logger.error("Exception during upsert:", e);
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>> }
>>>>>>> class KuduConnection {
>>>>>>> private static Logger logger = LoggerFactory.getLogger(KuduCo
>>>>>>> nnection.class);
>>>>>>> private static Map<String, AsyncKuduClient> asyncCache = new
>>>>>>> HashMap<>();
>>>>>>> private static int ShutdownHookPriority = 100;
>>>>>>>
>>>>>>> static AsyncKuduClient getAsyncClient(String kuduMaster) {
>>>>>>> if (!asyncCache.containsKey(kuduMaster)) {
>>>>>>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien
>>>>>>> tBuilder(kuduMaster).build();
>>>>>>> ShutdownHookManager.get().addShutdownHook(new Runnable() {
>>>>>>> @Override
>>>>>>> public void run() {
>>>>>>> try {
>>>>>>> asyncClient.close();
>>>>>>> } catch (Exception e) {
>>>>>>> logger.error("Exception closing async client", e);
>>>>>>> }
>>>>>>> }
>>>>>>> }, ShutdownHookPriority);
>>>>>>> asyncCache.put(kuduMaster, asyncClient);
>>>>>>> }
>>>>>>> return asyncCache.get(kuduMaster);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Ravi
>>>>>>>
>>>>>>> On 5 March 2018 at 16:20, Mike Percy <mpe...@apache.org> wrote:
>>>>>>>
>>>>>>>> Hi Ravi, it would be helpful if you could attach what you are
>>>>>>>> getting back from getPendingErrors() -- perhaps from dumping
>>>>>>>> RowError.toString() from items in the returned array -- and indicate 
>>>>>>>> what
>>>>>>>> you were hoping to get back. Note that a RowError can also return to 
>>>>>>>> you
>>>>>>>> the Operation
>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
>>>>>>>> that you used to generate the write. From the Operation, you can get 
>>>>>>>> the
>>>>>>>> original PartialRow
>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
>>>>>>>> object, which should be able to identify the affected row that the 
>>>>>>>> write
>>>>>>>> failed for. Does that help?
>>>>>>>>
>>>>>>>> Since you are using the Kudu client directly, Spark is not involved
>>>>>>>> from the Kudu perspective, so you will need to deal with Spark on your 
>>>>>>>> own
>>>>>>>> in that case.
>>>>>>>>
>>>>>>>> Mike
>>>>>>>>
>>>>>>>> On Mon, Mar 5, 2018 at 1:

Re: Spark Streaming + Kudu

2018-03-06 Thread Mike Percy
hed. I tried doing all that you have suggested with no
>>>>>> luck. Attaching my KuduClient code.
>>>>>>
>>>>>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>>>>>
>>>>>> import java.util.HashMap;
>>>>>> import java.util.Iterator;
>>>>>> import java.util.Map;
>>>>>> import org.apache.hadoop.util.ShutdownHookManager;
>>>>>> import org.apache.kudu.client.*;
>>>>>> import org.apache.spark.api.java.JavaRDD;
>>>>>> import org.slf4j.Logger;
>>>>>> import org.slf4j.LoggerFactory;
>>>>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>>>>>> ullConstants;
>>>>>>
>>>>>> public class KuduProcess {
>>>>>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>>>>>> ocess.class);
>>>>>> private KuduTable table;
>>>>>> private KuduSession session;
>>>>>>
>>>>>> public static void upsertKudu(JavaRDD<Map<String, Object>> rdd,
>>>>>> String host, String tableName) {
>>>>>> rdd.foreachPartition(iterator -> {
>>>>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
>>>>>> tableName, host);
>>>>>> int errorCount = errors.getRowErrors().length;
>>>>>> if(errorCount > 0){
>>>>>> throw new RuntimeException("Failed to write " + errorCount + "
>>>>>> messages into Kudu");
>>>>>> }
>>>>>> });
>>>>>> }
>>>>>> private static RowErrorsAndOverflowStatus
>>>>>> upsertOpIterator(Iterator<Map<String, Object>> iter, String
>>>>>> tableName, String host) {
>>>>>> try {
>>>>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
>>>>>> KuduClient client = asyncClient.syncClient();
>>>>>> table = client.openTable(tableName);
>>>>>> session = client.newSession();
>>>>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>>>>>> SH_BACKGROUND);
>>>>>> while (iter.hasNext()) {
>>>>>> upsertOp(iter.next());
>>>>>> }
>>>>>> } catch (KuduException e) {
>>>>>> logger.error("Exception in upsertOpIterator method", e);
>>>>>> }
>>>>>> finally{
>>>>>> try {
>>>>>> session.close();
>>>>>> } catch (KuduException e) {
>>>>>> logger.error("Exception in Connection close", e);
>>>>>> }
>>>>>> }
>>>>>> return session.getPendingErrors();->
>>>>>> Once, the connection is lost, this part of the code never gets called and
>>>>>> the Spark job will keep on running and processing the records while
>>>>>> the KuduClient is trying to connect to Kudu. Meanwhile, we are loosing 
>>>>>> all
>>>>>> the records.
>>>>>> }
>>>>>> public static void upsertOp(Map<String, Object> formattedMap) {
>>>>>> if (formattedMap.size() != 0) {
>>>>>> try {
>>>>>> Upsert upsert = table.newUpsert();
>>>>>> PartialRow row = upsert.getRow();
>>>>>> for (Map.Entry<String, Object> entry : formattedMap.entrySet()) {
>>>>>> if (entry.getValue().getClass().equals(String.class)) {
>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
>>>>>> row.setNull(entry.getKey());
>>>>>> else
>>>>>> row.addString(entry.getKey(), (String) entry.getValue());
>>>>>> } else if (entry.getValue().getClass().equals(Long.class)) {
>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
>>>>>> row.setNull(entry.getKey());
>>>>>> else
>>>>>> row.addLong(entry.getKey(), (Long) entry.getValue());
>>>>>> } else if (entry.getValue().getClass().equals(Integer.class)) {
>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
>>>>>> row.setNull(entry.getKey());
>>>>>> else
>>>>>> row.addInt(entry.getKey(), (Integ

Re: Spark Streaming + Kudu

2018-03-06 Thread Ravi Kanth
>>>> import org.slf4j.Logger;
>>>>> import org.slf4j.LoggerFactory;
>>>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>>>>> ullConstants;
>>>>>
>>>>> public class KuduProcess {
>>>>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>>>>> ocess.class);
>>>>> private KuduTable table;
>>>>> private KuduSession session;
>>>>>
>>>>> public static void upsertKudu(JavaRDD<Map<String, Object>> rdd, String
>>>>> host, String tableName) {
>>>>> rdd.foreachPartition(iterator -> {
>>>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
>>>>> tableName, host);
>>>>> int errorCount = errors.getRowErrors().length;
>>>>> if(errorCount > 0){
>>>>> throw new RuntimeException("Failed to write " + errorCount + "
>>>>> messages into Kudu");
>>>>> }
>>>>> });
>>>>> }
>>>>> private static RowErrorsAndOverflowStatus
>>>>> upsertOpIterator(Iterator<Map<String, Object>> iter, String
>>>>> tableName, String host) {
>>>>> try {
>>>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
>>>>> KuduClient client = asyncClient.syncClient();
>>>>> table = client.openTable(tableName);
>>>>> session = client.newSession();
>>>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>>>>> SH_BACKGROUND);
>>>>> while (iter.hasNext()) {
>>>>> upsertOp(iter.next());
>>>>> }
>>>>> } catch (KuduException e) {
>>>>> logger.error("Exception in upsertOpIterator method", e);
>>>>> }
>>>>> finally{
>>>>> try {
>>>>> session.close();
>>>>> } catch (KuduException e) {
>>>>> logger.error("Exception in Connection close", e);
>>>>> }
>>>>> }
>>>>> return session.getPendingErrors();->
>>>>> Once, the connection is lost, this part of the code never gets called and
>>>>> the Spark job will keep on running and processing the records while
>>>>> the KuduClient is trying to connect to Kudu. Meanwhile, we are loosing all
>>>>> the records.
>>>>> }
>>>>> public static void upsertOp(Map<String, Object> formattedMap) {
>>>>> if (formattedMap.size() != 0) {
>>>>> try {
>>>>> Upsert upsert = table.newUpsert();
>>>>> PartialRow row = upsert.getRow();
>>>>> for (Map.Entry<String, Object> entry : formattedMap.entrySet()) {
>>>>> if (entry.getValue().getClass().equals(String.class)) {
>>>>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
>>>>> row.setNull(entry.getKey());
>>>>> else
>>>>> row.addString(entry.getKey(), (String) entry.getValue());
>>>>> } else if (entry.getValue().getClass().equals(Long.class)) {
>>>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
>>>>> row.setNull(entry.getKey());
>>>>> else
>>>>> row.addLong(entry.getKey(), (Long) entry.getValue());
>>>>> } else if (entry.getValue().getClass().equals(Integer.class)) {
>>>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
>>>>> row.setNull(entry.getKey());
>>>>> else
>>>>> row.addInt(entry.getKey(), (Integer) entry.getValue());
>>>>> }
>>>>> }
>>>>>
>>>>> session.apply(upsert);
>>>>> } catch (Exception e) {
>>>>> logger.error("Exception during upsert:", e);
>>>>> }
>>>>> }
>>>>> }
>>>>> }
>>>>> class KuduConnection {
>>>>> private static Logger logger = LoggerFactory.getLogger(KuduCo
>>>>> nnection.class);
>>>>> private static Map<String, AsyncKuduClient> asyncCache = new
>>>>> HashMap<>();
>>>>> private static int ShutdownHookPriority = 100;
>>>>>
>>>>> static AsyncKuduClient getAsyncClient(String kuduMaster) {
>>>>> if (!asyncCache.containsKey(kuduMaster)) {
>>>>> AsyncKuduClient asyncClient = new AsyncKuduCli

Re: Spark Streaming + Kudu

2018-03-06 Thread Ravi Kanth
leName);
>>> session = client.newSession();
>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>>> SH_BACKGROUND);
>>> while (iter.hasNext()) {
>>> upsertOp(iter.next());
>>> }
>>> } catch (KuduException e) {
>>> logger.error("Exception in upsertOpIterator method", e);
>>> }
>>> finally{
>>> try {
>>> session.close();
>>> } catch (KuduException e) {
>>> logger.error("Exception in Connection close", e);
>>> }
>>> }
>>> return session.getPendingErrors();-> Once,
>>> the connection is lost, this part of the code never gets called and the
>>> Spark job will keep on running and processing the records while the
>>> KuduClient is trying to connect to Kudu. Meanwhile, we are loosing all the
>>> records.
>>> }
>>> public static void upsertOp(Map<String, Object> formattedMap) {
>>> if (formattedMap.size() != 0) {
>>> try {
>>> Upsert upsert = table.newUpsert();
>>> PartialRow row = upsert.getRow();
>>> for (Map.Entry<String, Object> entry : formattedMap.entrySet()) {
>>> if (entry.getValue().getClass().equals(String.class)) {
>>> if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
>>> row.setNull(entry.getKey());
>>> else
>>> row.addString(entry.getKey(), (String) entry.getValue());
>>> } else if (entry.getValue().getClass().equals(Long.class)) {
>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
>>> row.setNull(entry.getKey());
>>> else
>>> row.addLong(entry.getKey(), (Long) entry.getValue());
>>> } else if (entry.getValue().getClass().equals(Integer.class)) {
>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
>>> row.setNull(entry.getKey());
>>> else
>>> row.addInt(entry.getKey(), (Integer) entry.getValue());
>>> }
>>> }
>>>
>>> session.apply(upsert);
>>> } catch (Exception e) {
>>> logger.error("Exception during upsert:", e);
>>> }
>>> }
>>> }
>>> }
>>> class KuduConnection {
>>> private static Logger logger = LoggerFactory.getLogger(KuduCo
>>> nnection.class);
>>> private static Map<String, AsyncKuduClient> asyncCache = new HashMap<>();
>>> private static int ShutdownHookPriority = 100;
>>>
>>> static AsyncKuduClient getAsyncClient(String kuduMaster) {
>>> if (!asyncCache.containsKey(kuduMaster)) {
>>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien
>>> tBuilder(kuduMaster).build();
>>> ShutdownHookManager.get().addShutdownHook(new Runnable() {
>>> @Override
>>> public void run() {
>>> try {
>>> asyncClient.close();
>>> } catch (Exception e) {
>>> logger.error("Exception closing async client", e);
>>> }
>>> }
>>> }, ShutdownHookPriority);
>>> asyncCache.put(kuduMaster, asyncClient);
>>> }
>>> return asyncCache.get(kuduMaster);
>>> }
>>> }
>>>
>>>
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 5 March 2018 at 16:20, Mike Percy <mpe...@apache.org> wrote:
>>>
>>>> Hi Ravi, it would be helpful if you could attach what you are getting
>>>> back from getPendingErrors() -- perhaps from dumping RowError.toString()
>>>> from items in the returned array -- and indicate what you were hoping to
>>>> get back. Note that a RowError can also return to you the Operation
>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
>>>> that you used to generate the write. From the Operation, you can get the
>>>> original PartialRow
>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
>>>> object, which should be able to identify the affected row that the write
>>>> failed for. Does that help?
>>>>
>>>> Since you are using the Kudu client directly, Spark is not involved
>>>> from the Kudu perspective, so you will need to deal with Spark on your own
>>>> in that case.
>>>>
>>>> Mike
>>>>
>>>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi Mike,
>>>>>
>>>>> Thanks for the 

Re: Spark Streaming + Kudu

2018-03-05 Thread Mike Percy
KuduConnection {
>> private static Logger logger = LoggerFactory.getLogger(KuduCo
>> nnection.class);
>> private static Map<String, AsyncKuduClient> asyncCache = new HashMap<>();
>> private static int ShutdownHookPriority = 100;
>>
>> static AsyncKuduClient getAsyncClient(String kuduMaster) {
>> if (!asyncCache.containsKey(kuduMaster)) {
>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien
>> tBuilder(kuduMaster).build();
>> ShutdownHookManager.get().addShutdownHook(new Runnable() {
>> @Override
>> public void run() {
>> try {
>> asyncClient.close();
>> } catch (Exception e) {
>> logger.error("Exception closing async client", e);
>> }
>> }
>> }, ShutdownHookPriority);
>> asyncCache.put(kuduMaster, asyncClient);
>> }
>> return asyncCache.get(kuduMaster);
>> }
>> }
>>
>>
>>
>> Thanks,
>> Ravi
>>
>> On 5 March 2018 at 16:20, Mike Percy <mpe...@apache.org> wrote:
>>
>>> Hi Ravi, it would be helpful if you could attach what you are getting
>>> back from getPendingErrors() -- perhaps from dumping RowError.toString()
>>> from items in the returned array -- and indicate what you were hoping to
>>> get back. Note that a RowError can also return to you the Operation
>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
>>> that you used to generate the write. From the Operation, you can get the
>>> original PartialRow
>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
>>> object, which should be able to identify the affected row that the write
>>> failed for. Does that help?
>>>
>>> Since you are using the Kudu client directly, Spark is not involved from
>>> the Kudu perspective, so you will need to deal with Spark on your own in
>>> that case.
>>>
>>> Mike
>>>
>>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth@gmail.com>
>>> wrote:
>>>
>>>> Hi Mike,
>>>>
>>>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>>>>
>>>> So, I am trying to use Kudu Client API to perform UPSERT into Kudu and
>>>> I integrated this with Spark. I am trying to test a case where in if any of
>>>> Kudu server fails. So, in this case, if there is any problem in writing,
>>>> getPendingErrors() should give me a way to handle these errors so that I
>>>> can successfully terminate my Spark Job. This is what I am trying to do.
>>>>
>>>> But, I am not able to get a hold of the exceptions being thrown from
>>>> with in the KuduClient when retrying to connect to Tablet Server. My
>>>> getPendingErrors is not getting ahold of these exceptions.
>>>>
>>>> Let me know if you need more clarification. I can post some Snippets.
>>>>
>>>> Thanks,
>>>> Ravi
>>>>
>>>> On 5 March 2018 at 13:18, Mike Percy <mpe...@apache.org> wrote:
>>>>
>>>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>?
>>>>> You mention that you are trying to use getPendingErrors()
>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors-->
>>>>>  but
>>>>> it sounds like it's not working for you -- can you be more specific about
>>>>> what you expect and what you are observing?
>>>>>
>>>>> Thanks,
>>>>> Mike
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth <ravikanth@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't
>>>>>> see any issues in production and we are not losing tablet servers. But, 
>>>>>> as
>>>>>> part of testing I have to generate few unforeseen cases to analyse the
>>>>>> application performance. One among that is bringing down the tablet 
>>>>>> server
>>>>>> or master server intentionally during which I observed the loss of 
>>>>>> records.
>>>>>> Just wanted to test cases out of the happy path here. Once again thanks 

Re: Spark Streaming + Kudu

2018-03-05 Thread Ravi Kanth
riority);
> asyncCache.put(kuduMaster, asyncClient);
> }
> return asyncCache.get(kuduMaster);
> }
> }
>
>
>
> Thanks,
> Ravi
>
> On 5 March 2018 at 16:20, Mike Percy <mpe...@apache.org> wrote:
>
>> Hi Ravi, it would be helpful if you could attach what you are getting
>> back from getPendingErrors() -- perhaps from dumping RowError.toString()
>> from items in the returned array -- and indicate what you were hoping to
>> get back. Note that a RowError can also return to you the Operation
>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
>> that you used to generate the write. From the Operation, you can get the
>> original PartialRow
>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
>> object, which should be able to identify the affected row that the write
>> failed for. Does that help?
>>
>> Since you are using the Kudu client directly, Spark is not involved from
>> the Kudu perspective, so you will need to deal with Spark on your own in
>> that case.
>>
>> Mike
>>
>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth@gmail.com>
>> wrote:
>>
>>> Hi Mike,
>>>
>>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>>>
>>> So, I am trying to use Kudu Client API to perform UPSERT into Kudu and I
>>> integrated this with Spark. I am trying to test a case where in if any of
>>> Kudu server fails. So, in this case, if there is any problem in writing,
>>> getPendingErrors() should give me a way to handle these errors so that I
>>> can successfully terminate my Spark Job. This is what I am trying to do.
>>>
>>> But, I am not able to get a hold of the exceptions being thrown from
>>> with in the KuduClient when retrying to connect to Tablet Server. My
>>> getPendingErrors is not getting ahold of these exceptions.
>>>
>>> Let me know if you need more clarification. I can post some Snippets.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 5 March 2018 at 13:18, Mike Percy <mpe...@apache.org> wrote:
>>>
>>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>?
>>>> You mention that you are trying to use getPendingErrors()
>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors-->
>>>>  but
>>>> it sounds like it's not working for you -- can you be more specific about
>>>> what you expect and what you are observing?
>>>>
>>>> Thanks,
>>>> Mike
>>>>
>>>>
>>>>
>>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth <ravikanth@gmail.com>
>>>> wrote:
>>>>
>>>>> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't
>>>>> see any issues in production and we are not losing tablet servers. But, as
>>>>> part of testing I have to generate few unforeseen cases to analyse the
>>>>> application performance. One among that is bringing down the tablet server
>>>>> or master server intentionally during which I observed the loss of 
>>>>> records.
>>>>> Just wanted to test cases out of the happy path here. Once again thanks 
>>>>> for
>>>>> taking time to respond to me.
>>>>>
>>>>> - Ravi
>>>>>
>>>>> On 26 February 2018 at 19:58, Clifford Resnick <cresn...@mediamath.com
>>>>> > wrote:
>>>>>
>>>>>> I'll have to get back to you on the code bits, but I'm pretty sure
>>>>>> we're doing simple sync batching. We're not in production yet, but after
>>>>>> some months of development I haven't seen any failures, even when pushing
>>>>>> load doing multiple years' backfill. I think the real question is why are
>>>>>> you losing tablet servers? The only instability we ever had with Kudu was
>>>>>> when it had that weird ntp sync issue that was fixed I think for 1.6. 
>>>>>> What
>>>>>> version are you running?
>>>>>>
>>>>>> Anyway I would think that infinite loop should be catchable
>>>>>> somewhere. Our pipeline is set to fail/retry with Flink snapshots. I
>>>>>>

Re: Spark Streaming + Kudu

2018-03-05 Thread Ravi Kanth
Mike,

Thanks for the information. But, once the connection to any of the Kudu
servers is lost then there is no way I can have a control on the
KuduSession object and so with getPendingErrors(). The KuduClient in this
case is becoming a zombie and never returned back till the connection is
properly established. I tried doing all that you have suggested with no
luck. Attaching my KuduClient code.

package org.dwh.streaming.kudu.sparkkudustreaming;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.kudu.client.*;
import org.apache.spark.api.java.JavaRDD;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import
org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialNullConstants;

public class KuduProcess {
private static Logger logger = LoggerFactory.getLogger(KuduProcess.class);
private KuduTable table;
private KuduSession session;

public static void upsertKudu(JavaRDD<Map<String, Object>> rdd, String
host, String tableName) {
rdd.foreachPartition(iterator -> {
RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator, tableName,
host);
int errorCount = errors.getRowErrors().length;
if(errorCount > 0){
throw new RuntimeException("Failed to write " + errorCount + " messages
into Kudu");
}
});
}
private static RowErrorsAndOverflowStatus
upsertOpIterator(Iterator<Map<String, Object>> iter, String tableName,
String host) {
try {
AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
KuduClient client = asyncClient.syncClient();
table = client.openTable(tableName);
session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
while (iter.hasNext()) {
upsertOp(iter.next());
}
} catch (KuduException e) {
logger.error("Exception in upsertOpIterator method", e);
}
finally{
try {
session.close();
} catch (KuduException e) {
logger.error("Exception in Connection close", e);
}
}
return session.getPendingErrors();-> Once, the
connection is lost, this part of the code never gets called and the Spark
job will keep on running and processing the records while the KuduClient is
trying to connect to Kudu. Meanwhile, we are loosing all the records.
}
public static void upsertOp(Map<String, Object> formattedMap) {
if (formattedMap.size() != 0) {
try {
Upsert upsert = table.newUpsert();
PartialRow row = upsert.getRow();
for (Map.Entry<String, Object> entry : formattedMap.entrySet()) {
if (entry.getValue().getClass().equals(String.class)) {
if (entry.getValue().equals(SpecialNullConstants.specialStringNull))
row.setNull(entry.getKey());
else
row.addString(entry.getKey(), (String) entry.getValue());
} else if (entry.getValue().getClass().equals(Long.class)) {
if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
row.setNull(entry.getKey());
else
row.addLong(entry.getKey(), (Long) entry.getValue());
} else if (entry.getValue().getClass().equals(Integer.class)) {
if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
row.setNull(entry.getKey());
else
row.addInt(entry.getKey(), (Integer) entry.getValue());
}
}

session.apply(upsert);
} catch (Exception e) {
logger.error("Exception during upsert:", e);
}
}
}
}
class KuduConnection {
private static Logger logger =
LoggerFactory.getLogger(KuduConnection.class);
private static Map<String, AsyncKuduClient> asyncCache = new HashMap<>();
private static int ShutdownHookPriority = 100;

static AsyncKuduClient getAsyncClient(String kuduMaster) {
if (!asyncCache.containsKey(kuduMaster)) {
AsyncKuduClient asyncClient = new
AsyncKuduClient.AsyncKuduClientBuilder(kuduMaster).build();
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
try {
asyncClient.close();
} catch (Exception e) {
logger.error("Exception closing async client", e);
}
}
}, ShutdownHookPriority);
asyncCache.put(kuduMaster, asyncClient);
}
return asyncCache.get(kuduMaster);
}
}



Thanks,
Ravi

On 5 March 2018 at 16:20, Mike Percy <mpe...@apache.org> wrote:

> Hi Ravi, it would be helpful if you could attach what you are getting back
> from getPendingErrors() -- perhaps from dumping RowError.toString() from
> items in the returned array -- and indicate what you were hoping to get
> back. Note that a RowError can also return to you the Operation
> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
> that you used to generate the write. From the Operation, you can get the
> original PartialRow
> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
> object, which should be able to identify the affected row that the write
> failed for. Does that help?
>
> Since you are using the Kudu client directly, Spark is not involved fr

Re: Spark Streaming + Kudu

2018-03-05 Thread Mike Percy
Hi Ravi, it would be helpful if you could attach what you are getting back
from getPendingErrors() -- perhaps from dumping RowError.toString() from
items in the returned array -- and indicate what you were hoping to get
back. Note that a RowError can also return to you the Operation
<https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
that you used to generate the write. From the Operation, you can get the
original PartialRow
<https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
object, which should be able to identify the affected row that the write
failed for. Does that help?

Since you are using the Kudu client directly, Spark is not involved from
the Kudu perspective, so you will need to deal with Spark on your own in
that case.

Mike

On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <ravikanth@gmail.com> wrote:

> Hi Mike,
>
> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>
> So, I am trying to use Kudu Client API to perform UPSERT into Kudu and I
> integrated this with Spark. I am trying to test a case where in if any of
> Kudu server fails. So, in this case, if there is any problem in writing,
> getPendingErrors() should give me a way to handle these errors so that I
> can successfully terminate my Spark Job. This is what I am trying to do.
>
> But, I am not able to get a hold of the exceptions being thrown from with
> in the KuduClient when retrying to connect to Tablet Server. My
> getPendingErrors is not getting ahold of these exceptions.
>
> Let me know if you need more clarification. I can post some Snippets.
>
> Thanks,
> Ravi
>
> On 5 March 2018 at 13:18, Mike Percy <mpe...@apache.org> wrote:
>
>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>?
>> You mention that you are trying to use getPendingErrors()
>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors-->
>>  but
>> it sounds like it's not working for you -- can you be more specific about
>> what you expect and what you are observing?
>>
>> Thanks,
>> Mike
>>
>>
>>
>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth <ravikanth@gmail.com>
>> wrote:
>>
>>> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
>>> any issues in production and we are not losing tablet servers. But, as part
>>> of testing I have to generate few unforeseen cases to analyse the
>>> application performance. One among that is bringing down the tablet server
>>> or master server intentionally during which I observed the loss of records.
>>> Just wanted to test cases out of the happy path here. Once again thanks for
>>> taking time to respond to me.
>>>
>>> - Ravi
>>>
>>> On 26 February 2018 at 19:58, Clifford Resnick <cresn...@mediamath.com>
>>> wrote:
>>>
>>>> I'll have to get back to you on the code bits, but I'm pretty sure
>>>> we're doing simple sync batching. We're not in production yet, but after
>>>> some months of development I haven't seen any failures, even when pushing
>>>> load doing multiple years' backfill. I think the real question is why are
>>>> you losing tablet servers? The only instability we ever had with Kudu was
>>>> when it had that weird ntp sync issue that was fixed I think for 1.6. What
>>>> version are you running?
>>>>
>>>> Anyway I would think that infinite loop should be catchable somewhere.
>>>> Our pipeline is set to fail/retry with Flink snapshots. I imagine there is
>>>> similar with Spark. Sorry I cant be of more help!
>>>>
>>>>
>>>>
>>>> On Feb 26, 2018 9:10 PM, Ravi Kanth <ravikanth@gmail.com> wrote:
>>>>
>>>> Cliff,
>>>>
>>>> Thanks for the response. Well, I do agree that its simple and seamless.
>>>> In my case, I am able to upsert ~25000 events/sec into Kudu. But, I am
>>>> facing the problem when any of the Kudu Tablet or master server is down. I
>>>> am not able to get a hold of the exception from client. The client is going
>>>> into an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
>>>> records. I tried handling the errors through getPendingErrors() but still
>>>> it is helpless. I am using AsyncKuduClient to establish the connection and
>>>> retrieving the syncClient from the 

Re: Spark Streaming + Kudu

2018-03-05 Thread Ravi Kanth
Hi Mike,

Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.

So, I am trying to use Kudu Client API to perform UPSERT into Kudu and I
integrated this with Spark. I am trying to test a case where in if any of
Kudu server fails. So, in this case, if there is any problem in writing,
getPendingErrors() should give me a way to handle these errors so that I
can successfully terminate my Spark Job. This is what I am trying to do.

But, I am not able to get a hold of the exceptions being thrown from with
in the KuduClient when retrying to connect to Tablet Server. My
getPendingErrors is not getting ahold of these exceptions.

Let me know if you need more clarification. I can post some Snippets.

Thanks,
Ravi

On 5 March 2018 at 13:18, Mike Percy  wrote:

> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
> ?
> You mention that you are trying to use getPendingErrors()
> 
>  but
> it sounds like it's not working for you -- can you be more specific about
> what you expect and what you are observing?
>
> Thanks,
> Mike
>
>
>
> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth 
> wrote:
>
>> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
>> any issues in production and we are not losing tablet servers. But, as part
>> of testing I have to generate few unforeseen cases to analyse the
>> application performance. One among that is bringing down the tablet server
>> or master server intentionally during which I observed the loss of records.
>> Just wanted to test cases out of the happy path here. Once again thanks for
>> taking time to respond to me.
>>
>> - Ravi
>>
>> On 26 February 2018 at 19:58, Clifford Resnick 
>> wrote:
>>
>>> I'll have to get back to you on the code bits, but I'm pretty sure we're
>>> doing simple sync batching. We're not in production yet, but after some
>>> months of development I haven't seen any failures, even when pushing load
>>> doing multiple years' backfill. I think the real question is why are you
>>> losing tablet servers? The only instability we ever had with Kudu was when
>>> it had that weird ntp sync issue that was fixed I think for 1.6. What
>>> version are you running?
>>>
>>> Anyway I would think that infinite loop should be catchable somewhere.
>>> Our pipeline is set to fail/retry with Flink snapshots. I imagine there is
>>> similar with Spark. Sorry I cant be of more help!
>>>
>>>
>>>
>>> On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:
>>>
>>> Cliff,
>>>
>>> Thanks for the response. Well, I do agree that its simple and seamless.
>>> In my case, I am able to upsert ~25000 events/sec into Kudu. But, I am
>>> facing the problem when any of the Kudu Tablet or master server is down. I
>>> am not able to get a hold of the exception from client. The client is going
>>> into an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
>>> records. I tried handling the errors through getPendingErrors() but still
>>> it is helpless. I am using AsyncKuduClient to establish the connection and
>>> retrieving the syncClient from the Async to open the session and table. Any
>>> help?
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 26 February 2018 at 18:00, Cliff Resnick  wrote:
>>>
>>> While I can't speak for Spark, we do use the client API from Flink
>>> streaming and it's simple and seamless. It's especially nice if you require
>>> an Upsert semantic.
>>>
>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:
>>>
>>> Hi,
>>>
>>> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
>>> Client API to do so rather than the traditional KuduContext API? I am stuck
>>> at a point and couldn't find a solution.
>>>
>>> Thanks,
>>> Ravi
>>>
>>>
>>>
>>>
>>
>


Re: Spark Streaming + Kudu

2018-03-05 Thread Mike Percy
Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
?
You mention that you are trying to use getPendingErrors()

but
it sounds like it's not working for you -- can you be more specific about
what you expect and what you are observing?

Thanks,
Mike



On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth  wrote:

> Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
> any issues in production and we are not losing tablet servers. But, as part
> of testing I have to generate few unforeseen cases to analyse the
> application performance. One among that is bringing down the tablet server
> or master server intentionally during which I observed the loss of records.
> Just wanted to test cases out of the happy path here. Once again thanks for
> taking time to respond to me.
>
> - Ravi
>
> On 26 February 2018 at 19:58, Clifford Resnick 
> wrote:
>
>> I'll have to get back to you on the code bits, but I'm pretty sure we're
>> doing simple sync batching. We're not in production yet, but after some
>> months of development I haven't seen any failures, even when pushing load
>> doing multiple years' backfill. I think the real question is why are you
>> losing tablet servers? The only instability we ever had with Kudu was when
>> it had that weird ntp sync issue that was fixed I think for 1.6. What
>> version are you running?
>>
>> Anyway I would think that infinite loop should be catchable somewhere.
>> Our pipeline is set to fail/retry with Flink snapshots. I imagine there is
>> similar with Spark. Sorry I cant be of more help!
>>
>>
>>
>> On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:
>>
>> Cliff,
>>
>> Thanks for the response. Well, I do agree that its simple and seamless.
>> In my case, I am able to upsert ~25000 events/sec into Kudu. But, I am
>> facing the problem when any of the Kudu Tablet or master server is down. I
>> am not able to get a hold of the exception from client. The client is going
>> into an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
>> records. I tried handling the errors through getPendingErrors() but still
>> it is helpless. I am using AsyncKuduClient to establish the connection and
>> retrieving the syncClient from the Async to open the session and table. Any
>> help?
>>
>> Thanks,
>> Ravi
>>
>> On 26 February 2018 at 18:00, Cliff Resnick  wrote:
>>
>> While I can't speak for Spark, we do use the client API from Flink
>> streaming and it's simple and seamless. It's especially nice if you require
>> an Upsert semantic.
>>
>> On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:
>>
>> Hi,
>>
>> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
>> Client API to do so rather than the traditional KuduContext API? I am stuck
>> at a point and couldn't find a solution.
>>
>> Thanks,
>> Ravi
>>
>>
>>
>>
>


Re: Spark Streaming + Kudu

2018-02-26 Thread Ravi Kanth
Thank Clifford. We are running Kudu 1.4 version. Till date we didn't see
any issues in production and we are not losing tablet servers. But, as part
of testing I have to generate few unforeseen cases to analyse the
application performance. One among that is bringing down the tablet server
or master server intentionally during which I observed the loss of records.
Just wanted to test cases out of the happy path here. Once again thanks for
taking time to respond to me.

- Ravi

On 26 February 2018 at 19:58, Clifford Resnick 
wrote:

> I'll have to get back to you on the code bits, but I'm pretty sure we're
> doing simple sync batching. We're not in production yet, but after some
> months of development I haven't seen any failures, even when pushing load
> doing multiple years' backfill. I think the real question is why are you
> losing tablet servers? The only instability we ever had with Kudu was when
> it had that weird ntp sync issue that was fixed I think for 1.6. What
> version are you running?
>
> Anyway I would think that infinite loop should be catchable somewhere. Our
> pipeline is set to fail/retry with Flink snapshots. I imagine there is
> similar with Spark. Sorry I cant be of more help!
>
>
>
> On Feb 26, 2018 9:10 PM, Ravi Kanth  wrote:
>
> Cliff,
>
> Thanks for the response. Well, I do agree that its simple and seamless. In
> my case, I am able to upsert ~25000 events/sec into Kudu. But, I am facing
> the problem when any of the Kudu Tablet or master server is down. I am not
> able to get a hold of the exception from client. The client is going into
> an infinite loop trying to connect to Kudu. Meanwhile, I am loosing my
> records. I tried handling the errors through getPendingErrors() but still
> it is helpless. I am using AsyncKuduClient to establish the connection and
> retrieving the syncClient from the Async to open the session and table. Any
> help?
>
> Thanks,
> Ravi
>
> On 26 February 2018 at 18:00, Cliff Resnick  wrote:
>
> While I can't speak for Spark, we do use the client API from Flink
> streaming and it's simple and seamless. It's especially nice if you require
> an Upsert semantic.
>
> On Feb 26, 2018 7:51 PM, "Ravi Kanth"  wrote:
>
> Hi,
>
> Anyone using Spark Streaming to ingest data into Kudu and using Kudu
> Client API to do so rather than the traditional KuduContext API? I am stuck
> at a point and couldn't find a solution.
>
> Thanks,
> Ravi
>
>
>
>


Re: Spark on Kudu Roadmap

2017-04-09 Thread Benjamin Kim
Hi Mike,

Thanks for the link. I guess further, deeper Spark integration is slowly 
coming. But when, we will have to wait and see.

Cheers,
Ben
 

> On Mar 27, 2017, at 12:25 PM, Mike Percy  wrote:
> 
> Hi Ben,
> I don't really know so I'll let someone else more familiar with the Spark 
> integration chime in on that. However I searched the Kudu JIRA and I don't 
> see a tracking ticket filed on this (the closest thing I could find was 
> https://issues.apache.org/jira/browse/KUDU-1676 
>  ) so you may want to file a 
> JIRA to help track this feature.
> 
> Mike
> 
> 
> On Mon, Mar 27, 2017 at 11:55 AM, Benjamin Kim  > wrote:
> Hi Mike,
> 
> I believe what we are looking for is this below. It is an often request use 
> case.
> 
> Anyone know if the Spark package will ever allow for creating tables in Spark 
> SQL?
> 
> Such as:
>CREATE EXTERNAL TABLE 
>USING org.apache.kudu.spark.kudu
>OPTIONS (Map("kudu.master" -> “", "kudu.table" -> 
> “table-name”));
> 
> In this way, plain SQL can be used to do DDL, DML statements whether in Spark 
> SQL code or using JDBC to interface with Spark SQL Thriftserver.
> 
> Thanks,
> Ben
> 
> 
> 
>> On Mar 27, 2017, at 11:01 AM, Mike Percy > > wrote:
>> 
>> Hi Ben,
>> Is there anything in particular you are looking for?
>> 
>> Thanks,
>> Mike
>> 
>> On Mon, Mar 27, 2017 at 9:48 AM, Benjamin Kim > > wrote:
>> Hi,
>> 
>> Are there any plans for deeper integration with Spark especially Spark SQL? 
>> Is there a roadmap to look at, so I can know what to expect in the future?
>> 
>> Cheers,
>> Ben
>> 
> 
> 



Re: Spark on Kudu Roadmap

2017-03-27 Thread Benjamin Kim
Hi Mike,

I believe what we are looking for is this below. It is an often request use 
case.

Anyone know if the Spark package will ever allow for creating tables in Spark 
SQL?

Such as:
   CREATE EXTERNAL TABLE 
   USING org.apache.kudu.spark.kudu
   OPTIONS (Map("kudu.master" -> “", "kudu.table" -> 
“table-name”));

In this way, plain SQL can be used to do DDL, DML statements whether in Spark 
SQL code or using JDBC to interface with Spark SQL Thriftserver.

Thanks,
Ben


> On Mar 27, 2017, at 11:01 AM, Mike Percy  wrote:
> 
> Hi Ben,
> Is there anything in particular you are looking for?
> 
> Thanks,
> Mike
> 
> On Mon, Mar 27, 2017 at 9:48 AM, Benjamin Kim  > wrote:
> Hi,
> 
> Are there any plans for deeper integration with Spark especially Spark SQL? 
> Is there a roadmap to look at, so I can know what to expect in the future?
> 
> Cheers,
> Ben
> 



Re: Spark on Kudu Roadmap

2017-03-27 Thread Mike Percy
Hi Ben,
Is there anything in particular you are looking for?

Thanks,
Mike

On Mon, Mar 27, 2017 at 9:48 AM, Benjamin Kim  wrote:

> Hi,
>
> Are there any plans for deeper integration with Spark especially Spark
> SQL? Is there a roadmap to look at, so I can know what to expect in the
> future?
>
> Cheers,
> Ben


Spark on Kudu Roadmap

2017-03-27 Thread Benjamin Kim
Hi,

Are there any plans for deeper integration with Spark especially Spark SQL? Is 
there a roadmap to look at, so I can know what to expect in the future?

Cheers,
Ben

Re: Spark on Kudu

2016-10-10 Thread Mark Hamstra
I realize that the Spark on Kudu work to date has been based on Spark 1.6,
where your statement about Spark SQL relying on Hive is true.  In Spark
2.0, however, that dependency no longer exists since Spark SQL essentially
copied over the parts of Hive that were needed into Spark itself, and has
been free to diverge since then.

On Mon, Oct 10, 2016 at 4:11 PM, Dan Burkert <d...@cloudera.com> wrote:

> Hi Ben,
>
> SparkSQL relies on Hive for DDL statements, so having support for this
> requires adding support to Hive for manipulating Kudu tables.  This is
> something that we would like to do in the long term, but there are no
> concrete plans (that I know of) to make it happen in the near term.
>
> - Dan
>
> On Thu, Oct 6, 2016 at 4:38 PM, Benjamin Kim <bbuil...@gmail.com> wrote:
>
>> Anyone know if the Spark package will ever allow for creating tables in
>> Spark SQL?
>>
>> Such as:
>>CREATE EXTERNAL TABLE 
>>USING org.apache.kudu.spark.kudu
>>OPTIONS (Map("kudu.master" -> “", "kudu.table" ->
>> “table-name”));
>>
>> In this way, plain SQL can be used to do DDL, DML statements whether in
>> Spark SQL code or using JDBC to interface with Spark SQL Thriftserver.
>>
>> By the way, we are trying to create a DMP in Kudu with the a farm of
>> RESTful Endpoints to do cookie sync, ad serving, segmentation data
>> exchange. And, the Spark compute cluster and the Kudu cluster will reside
>> on the same racks in the same datacenter.
>>
>> Thanks,
>> Ben
>>
>> On Sep 20, 2016, at 3:02 PM, Jordan Birdsell <jordantbirds...@gmail.com>
>> wrote:
>>
>> http://kudu.apache.org/docs/developing.html#_kudu_integration_with_spark
>>
>> On Tue, Sep 20, 2016 at 5:00 PM Benjamin Kim <bbuil...@gmail.com> wrote:
>>
>>> I see that the API has changed a bit so my old code doesn’t work
>>> anymore. Can someone direct me to some code samples?
>>>
>>> Thanks,
>>> Ben
>>>
>>>
>>> On Sep 20, 2016, at 1:44 PM, Todd Lipcon <t...@cloudera.com> wrote:
>>>
>>> On Tue, Sep 20, 2016 at 1:18 PM, Benjamin Kim <bbuil...@gmail.com> wrote
>>> :
>>>
>>>> Now that Kudu 1.0.0 is officially out and ready for production use,
>>>> where do we find the spark connector jar for this release?
>>>>
>>>>
>>> It's available in the official ASF maven repository:
>>> https://repository.apache.org/#nexus-search;quick~kudu-spark
>>>
>>> 
>>>   org.apache.kudu
>>>   kudu-spark_2.10
>>>   1.0.0
>>> 
>>>
>>>
>>> -Todd
>>>
>>>
>>>
>>>> On Jun 17, 2016, at 11:08 AM, Dan Burkert <d...@cloudera.com> wrote:
>>>>
>>>> Hi Ben,
>>>>
>>>> To your first question about `CREATE TABLE` syntax with Kudu/Spark SQL,
>>>> I do not think we support that at this point.  I haven't looked deeply into
>>>> it, but we may hit issues specifying Kudu-specific options (partitioning,
>>>> column encoding, etc.).  Probably issues that can be worked through
>>>> eventually, though.  If you are interested in contributing to Kudu, this is
>>>> an area that could obviously use improvement!  Most or all of our Spark
>>>> features have been completely community driven to date.
>>>>
>>>>
>>>>> I am assuming that more Spark support along with semantic changes
>>>>> below will be incorporated into Kudu 0.9.1.
>>>>>
>>>>
>>>> As a rule we do not release new features in patch releases, but the
>>>> good news is that we are releasing regularly, and our next scheduled
>>>> release is for the August timeframe (see JD's roadmap
>>>> <https://lists.apache.org/thread.html/1a3b949e715a74d7f26bd9c102247441a06d16d077324ba39a662e2a@1455234076@%3Cdev.kudu.apache.org%3E>
>>>>  email
>>>> about what we are aiming to include).  Also, Cloudera does publish snapshot
>>>> versions of the Spark connector here
>>>> <https://repository.cloudera.com/cloudera/cloudera-repos/org/kududb/>,
>>>> so the jars are available if you don't mind using snapshots.
>>>>
>>>>
>>>>> Anyone know of a better way to make unique primary keys other than
>>>>> using UUID to make every row unique if there is no unique column (or
>>>>> combination thereof) to use.
>>>>>
>>

Re: Spark on Kudu

2016-09-20 Thread Benjamin Kim
Thanks!

> On Sep 20, 2016, at 3:02 PM, Jordan Birdsell <jordantbirds...@gmail.com> 
> wrote:
> 
> http://kudu.apache.org/docs/developing.html#_kudu_integration_with_spark 
> <http://kudu.apache.org/docs/developing.html#_kudu_integration_with_spark>
> 
> On Tue, Sep 20, 2016 at 5:00 PM Benjamin Kim <bbuil...@gmail.com 
> <mailto:bbuil...@gmail.com>> wrote:
> I see that the API has changed a bit so my old code doesn’t work anymore. Can 
> someone direct me to some code samples?
> 
> Thanks,
> Ben
> 
> 
>> On Sep 20, 2016, at 1:44 PM, Todd Lipcon <t...@cloudera.com 
>> <mailto:t...@cloudera.com>> wrote:
>> 
>> On Tue, Sep 20, 2016 at 1:18 PM, Benjamin Kim <bbuil...@gmail.com 
>> <mailto:bbuil...@gmail.com>> wrote:
>> Now that Kudu 1.0.0 is officially out and ready for production use, where do 
>> we find the spark connector jar for this release?
>> 
>> 
>> It's available in the official ASF maven repository:  
>> https://repository.apache.org/#nexus-search;quick~kudu-spark 
>> <https://repository.apache.org/#nexus-search;quick~kudu-spark>
>> 
>> 
>>   org.apache.kudu
>>   kudu-spark_2.10
>>   1.0.0
>> 
>> 
>> 
>> -Todd
>>  
>> 
>> 
>>> On Jun 17, 2016, at 11:08 AM, Dan Burkert <d...@cloudera.com 
>>> <mailto:d...@cloudera.com>> wrote:
>>> 
>>> Hi Ben,
>>> 
>>> To your first question about `CREATE TABLE` syntax with Kudu/Spark SQL, I 
>>> do not think we support that at this point.  I haven't looked deeply into 
>>> it, but we may hit issues specifying Kudu-specific options (partitioning, 
>>> column encoding, etc.).  Probably issues that can be worked through 
>>> eventually, though.  If you are interested in contributing to Kudu, this is 
>>> an area that could obviously use improvement!  Most or all of our Spark 
>>> features have been completely community driven to date.
>>>  
>>> I am assuming that more Spark support along with semantic changes below 
>>> will be incorporated into Kudu 0.9.1.
>>> 
>>> As a rule we do not release new features in patch releases, but the good 
>>> news is that we are releasing regularly, and our next scheduled release is 
>>> for the August timeframe (see JD's roadmap 
>>> <https://lists.apache.org/thread.html/1a3b949e715a74d7f26bd9c102247441a06d16d077324ba39a662e2a@1455234076@%3Cdev.kudu.apache.org%3E>
>>>  email about what we are aiming to include).  Also, Cloudera does publish 
>>> snapshot versions of the Spark connector here 
>>> <https://repository.cloudera.com/cloudera/cloudera-repos/org/kududb/>, so 
>>> the jars are available if you don't mind using snapshots.
>>>  
>>> Anyone know of a better way to make unique primary keys other than using 
>>> UUID to make every row unique if there is no unique column (or combination 
>>> thereof) to use.
>>> 
>>> Not that I know of.  In general it's pretty rare to have a dataset without 
>>> a natural primary key (even if it's just all of the columns), but in those 
>>> cases UUID is a good solution.
>>>  
>>> This is what I am using. I know auto incrementing is coming down the line 
>>> (don’t know when), but is there a way to simulate this in Kudu using Spark 
>>> out of curiosity?
>>> 
>>> To my knowledge there is no plan to have auto increment in Kudu.  
>>> Distributed, consistent, auto incrementing counters is a difficult problem, 
>>> and I don't think there are any known solutions that would be fast enough 
>>> for Kudu (happy to be proven wrong, though!).
>>> 
>>> - Dan
>>>  
>>> 
>>> Thanks,
>>> Ben
>>> 
>>>> On Jun 14, 2016, at 6:08 PM, Dan Burkert <d...@cloudera.com 
>>>> <mailto:d...@cloudera.com>> wrote:
>>>> 
>>>> I'm not sure exactly what the semantics will be, but at least one of them 
>>>> will be upsert.  These modes come from spark, and they were really 
>>>> designed for file-backed storage and not table storage.  We may want to do 
>>>> append = upsert, and overwrite = truncate + insert.  I think that may 
>>>> match the normal spark semantics more closely.
>>>> 
>>>> - Dan
>>>> 
>>>> On Tue, Jun 14, 2016 at 6:00 PM, Benjamin Kim <bbuil...@gmail.com 
>>>> <mailto:bbuil...@gmail.com>> wrote:
>>>> Dan,
>>>> 
>>&g