Hi All,

I am trying to read data from Kafka and ingest into Kudu using Spark
Streaming. I am not using KuduContext to perform the upsert operation into
kudu. Instead using Kudus native Client API to build the PartialRow and
applying the operation for every record from Kafka. I am able to run the
spark streaming job and every thing looks good. I am able to see the data
into Kudu tables. But, after processing few batches, when I bring down the
Kudu service, then my executor program becomes a zombie(the execution is not
at all coming to my executor class anymore) and the internal threads that
establishes connection to Kudu(which I am not handling in my code) is
throwing exceptions which I am not able to handle resulting in message loss.
Please find below the exception:


18/02/23 00:16:30 ERROR client.TabletClient: [Peer
bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from downstream on
[id: 0x6e13b01f]
java.net.ConnectException: Connection refused:
kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at
sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
    at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
    at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
    at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
    at
org.apache.kudu.client.shaded.org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
    at
org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
    at
org.apache.kudu.client.shaded.org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Also, my executor code(one of the map transformation in the lineage is
calling the below class) which establishes the connection to Kudu once per
JVM when application start is:


package org.dwh.streaming.kudu.sparkkudustreaming;

import java.util.List;
import java.util.Map;
import org.apache.kudu.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.dwh.streaming.kudu.sparkkudustreaming.config.LoadAppConf;
import
org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialNullConstants;
import org.dwh.streaming.kudu.sparkkudustreaming.models.Store;

public class KuduProcess {
        private static Logger logger = 
LoggerFactory.getLogger(KuduProcess.class);
        
        private static final KuduProcess instance = new KuduProcess();
        private static KuduClient client;
        private static KuduTable table;
        private static KuduSession session;
        private static OperationResponse response;
        
        private KuduProcess(){
                try {
                        Store store = LoadAppConf.loadAppConf();
                        client = new 
KuduClient.KuduClientBuilder(store.getKuduHost()).build();                 
                        table = client.openTable(store.getKuduTable());
                        session = client.newSession();
                
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
                } catch (KuduException e) {
                        logger.error("Kudu Exception:"+ e.getMessage());
                } 
        }

        public static String upsertKudu(Map<String, Object> formattedMap) {
                if (formattedMap.size() != 0) {
                        try {
                                Upsert upsert = table.newUpsert();
                                PartialRow row = upsert.getRow();
                                for(Map.Entry<String, Object> entry: 
formattedMap.entrySet()){
                                        if 
(entry.getValue().getClass().equals(String.class)){
                                                
if(entry.getValue().equals(SpecialNullConstants.specialStringNull))
                                                        
row.setNull(entry.getKey());
                                                else 
row.addString(entry.getKey(), (String) entry.getValue());
                                        }                                       
        
                                        else if 
(entry.getValue().getClass().equals(Long.class)){
                                                
if(entry.getValue().equals(SpecialNullConstants.specialLongNull))
                                                        
row.setNull(entry.getKey());
                                                else 
row.addLong(entry.getKey(), (Long) entry.getValue()); 
                                        }
                                        else if 
(entry.getValue().getClass().equals(Integer.class)){
                                                
if(entry.getValue().equals(SpecialNullConstants.specialIntNull))
                                                        
row.setNull(entry.getKey());
                                                else row.addInt(entry.getKey(), 
(Integer) entry.getValue());
                                        }
                                }
                                session.apply(upsert);
                                List<OperationResponse> responses = 
session.flush();
                          for (OperationResponse r : responses) {
                            if (r.hasRowError()) {
                              RowError e = r.getRowError();
                              if ("ALREADY_PRESENT".equals(e.getErrorStatus())) 
{
                                continue;
                              }
                              logger.error("Error inserting " +
e.getOperation().toString()
                                  + ": " + e.toString());
                            }
                          }
                                
                        } catch (Exception e) {
                                logger.error("Exception during upsert:",e);
                        } 
                }
                return "SUCCESS";
        }

}


Any suggestion on handling this case where I can avoid data loss is helpful.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to