I look at the logfiles from the Hadoop Yarn webinterface. I.e. actually
looking in the jobmanager.log of the container running the Flink task.
That is where I was able to find these messages .

I do the
 hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
in all places directly after the  HBaseConfiguration.create();
That way I simply force the task to look on the actual Hadoop node for the
same file it already loaded locally.

The reason I'm suspecting Flink is because the clientside part of the Flink
application does have the right setting and the task/job actually running
in the cluster does not have the same settings.
So it seems in the transition into the cluster the application does not
copy everything it has available locally for some reason.

There is a very high probability I did something wrong, I'm just not seeing
it at this moment.

Niels



On Fri, Oct 20, 2017 at 2:53 PM, Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> What do you mean by saying:
>
> When I open the logfiles on the Hadoop cluster I see this:
>
>
> The error doesn’t come from Flink? Where do you execute
>
> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
>
> ?
>
> To me it seems like it is a problem with misconfigured HBase and not
> something related to Flink.
>
> Piotrek
>
> On 20 Oct 2017, at 13:44, Niels Basjes <ni...@basjes.nl> wrote:
>
> To facilitate you guys helping me I put this test project on github:
> https://github.com/nielsbasjes/FlinkHBaseConnectProblem
>
> Niels Basjes
>
> On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes <ni...@basjes.nl> wrote:
>
>> Hi,
>>
>> Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn
>> cluster where I need to connect to HBase.
>>
>> What I have:
>>
>> In my environment:
>> HADOOP_CONF_DIR=/etc/hadoop/conf/
>> HBASE_CONF_DIR=/etc/hbase/conf/
>> HIVE_CONF_DIR=/etc/hive/conf/
>> YARN_CONF_DIR=/etc/hadoop/conf/
>>
>> In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper
>> hosts for HBase.
>>
>> My test code is this:
>>
>> public class Main {
>>   private static final Logger LOG = LoggerFactory.getLogger(Main.class);
>>
>>   public static void main(String[] args) throws Exception {
>>     printZookeeperConfig();
>>     final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>>     env.createInput(new HBaseSource()).print();
>>     env.execute("HBase config problem");
>>   }
>>
>>   public static void printZookeeperConfig() {
>>     String zookeeper = 
>> HBaseConfiguration.create().get("hbase.zookeeper.quorum");
>>     LOG.info("----> Loading HBaseConfiguration: Zookeeper = {}", zookeeper);
>>   }
>>
>>   public static class HBaseSource extends AbstractTableInputFormat<String> {
>>     @Override
>>     public void configure(org.apache.flink.configuration.Configuration 
>> parameters) {
>>       table = createTable();
>>       if (table != null) {
>>         scan = getScanner();
>>       }
>>     }
>>
>>     private HTable createTable() {
>>       LOG.info("Initializing HBaseConfiguration");
>>       // Uses files found in the classpath
>>       org.apache.hadoop.conf.Configuration hConf = 
>> HBaseConfiguration.create();
>>       printZookeeperConfig();
>>
>>       try {
>>         return new HTable(hConf, getTableName());
>>       } catch (Exception e) {
>>         LOG.error("Error instantiating a new HTable instance", e);
>>       }
>>       return null;
>>     }
>>
>>     @Override
>>     public String getTableName() {
>>       return "bugs:flink";
>>     }
>>
>>     @Override
>>     protected String mapResultToOutType(Result result) {
>>       return new 
>> String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8)));
>>     }
>>
>>     @Override
>>     protected Scan getScanner() {
>>       return new Scan();
>>     }
>>   }
>>
>> }
>>
>>
>> I run this application with this command on my Yarn cluster (note: first
>> starting a yarn-cluster and then submitting the job yields the same result).
>>
>> flink \
>>     run \
>>     -m yarn-cluster \
>>     --yarncontainer 1 \
>>     --yarnname "Flink on Yarn HBase problem" \
>>     --yarnslots                     1     \
>>     --yarnjobManagerMemory          4000  \
>>     --yarntaskManagerMemory         4000  \
>>     --yarnstreaming                       \
>>     target/flink-hbase-connect-1.0-SNAPSHOT.jar
>>
>> Now in the client side logfile 
>> /usr/local/flink-1.3.2/log/flink--client-80d2d21b10e0.log I see
>>
>> 1) Classpath actually contains /etc/hbase/conf/ both near the start and at 
>> the end.
>>
>> 2) The zookeeper settings of my experimental environent have been picked up 
>> by the software
>>
>> 2017-10-20 11:17:23,973 INFO  com.bol.bugreports.Main                        
>>                - ----> Loading HBaseConfiguration: Zookeeper = 
>> node1.kluster.local.nl.bol.com:2181,node2.kluster.local.nl.bol.com:2181,node3.kluster.local.nl.bol.com:2181
>>
>>
>> When I open the logfiles on the Hadoop cluster I see this:
>>
>> 2017-10-20 13:17:33,250 INFO  com.bol.bugreports.Main                        
>>                - ----> Loading HBaseConfiguration: Zookeeper = *localhost*
>>
>>
>> and as a consequence
>>
>> 2017-10-20 13:17:33,368 INFO  org.apache.zookeeper.ClientCnxn                
>>                - Opening socket connection to server 
>> localhost.localdomain/127.0.0.1:2181
>>
>> 2017-10-20 13:17:33,369 WARN  org.apache.zookeeper.ClientCnxn                
>>                - Session 0x0 for server null, unexpected error, closing 
>> socket connection and attempting reconnect
>>
>> java.net.ConnectException: Connection refused
>>
>>      at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>
>>      at 
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>>
>>      at 
>> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>>
>>      at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
>>
>> 2017-10-20 13:17:33,475 WARN  
>> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper        - Possibly 
>> transient ZooKeeper, quorum=localhost:2181, 
>> exception=org.apache.zookeeper.KeeperException$ConnectionLossException: 
>> KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
>>
>>
>>
>> The value 'localhost:2181' has been defined within the HBase jar in the
>> hbase-default.xml as the default value for the zookeeper nodes.
>>
>> As a workaround I currently put this extra line in my code which I know
>> is nasty but "works on my cluster"
>>
>> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
>>
>>
>> What am I doing wrong?
>>
>> What is the right way to fix this?
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>
>
> --
> Best regards / Met vriendelijke groeten,
>
> Niels Basjes
>
>
>


-- 
Best regards / Met vriendelijke groeten,

Niels Basjes

Reply via email to