Is this /etc/hbase/conf/hbase-site.xml file is present on all of the machines? 
If yes, could you share your code?

> On 20 Oct 2017, at 16:29, Niels Basjes <ni...@basjes.nl> wrote:
> 
> I look at the logfiles from the Hadoop Yarn webinterface. I.e. actually 
> looking in the jobmanager.log of the container running the Flink task.
> That is where I was able to find these messages .
> 
> I do the
>  hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
> in all places directly after the  HBaseConfiguration.create();
> That way I simply force the task to look on the actual Hadoop node for the 
> same file it already loaded locally.
> 
> The reason I'm suspecting Flink is because the clientside part of the Flink 
> application does have the right setting and the task/job actually running in 
> the cluster does not have the same settings.
> So it seems in the transition into the cluster the application does not copy 
> everything it has available locally for some reason.
> 
> There is a very high probability I did something wrong, I'm just not seeing 
> it at this moment.
> 
> Niels
> 
> 
> 
> On Fri, Oct 20, 2017 at 2:53 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Hi,
> 
> What do you mean by saying:
> 
>> When I open the logfiles on the Hadoop cluster I see this:
> 
> 
> The error doesn’t come from Flink? Where do you execute 
> 
> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
> 
> ?
> 
> To me it seems like it is a problem with misconfigured HBase and not 
> something related to Flink.
> 
> Piotrek
> 
>> On 20 Oct 2017, at 13:44, Niels Basjes <ni...@basjes.nl 
>> <mailto:ni...@basjes.nl>> wrote:
>> 
>> To facilitate you guys helping me I put this test project on github:
>> https://github.com/nielsbasjes/FlinkHBaseConnectProblem 
>> <https://github.com/nielsbasjes/FlinkHBaseConnectProblem>
>> 
>> Niels Basjes
>> 
>> On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes <ni...@basjes.nl 
>> <mailto:ni...@basjes.nl>> wrote:
>> Hi,
>> 
>> Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn 
>> cluster where I need to connect to HBase.
>> 
>> What I have:
>> 
>> In my environment:
>> HADOOP_CONF_DIR=/etc/hadoop/conf/
>> HBASE_CONF_DIR=/etc/hbase/conf/
>> HIVE_CONF_DIR=/etc/hive/conf/
>> YARN_CONF_DIR=/etc/hadoop/conf/
>> 
>> In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper 
>> hosts for HBase.
>> 
>> My test code is this:
>> public class Main {
>>   private static final Logger LOG = LoggerFactory.getLogger(Main.class);
>> 
>>   public static void main(String[] args) throws Exception {
>>     printZookeeperConfig();
>>     final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>>     env.createInput(new HBaseSource()).print();
>>     env.execute("HBase config problem");
>>   }
>> 
>>   public static void printZookeeperConfig() {
>>     String zookeeper = 
>> HBaseConfiguration.create().get("hbase.zookeeper.quorum");
>>     LOG.info("----> Loading HBaseConfiguration: Zookeeper = {}", zookeeper);
>>   }
>> 
>>   public static class HBaseSource extends AbstractTableInputFormat<String> {
>>     @Override
>>     public void configure(org.apache.flink.configuration.Configuration 
>> parameters) {
>>       table = createTable();
>>       if (table != null) {
>>         scan = getScanner();
>>       }
>>     }
>> 
>>     private HTable createTable() {
>>       LOG.info("Initializing HBaseConfiguration");
>>       // Uses files found in the classpath
>>       org.apache.hadoop.conf.Configuration hConf = 
>> HBaseConfiguration.create();
>>       printZookeeperConfig();
>> 
>>       try {
>>         return new HTable(hConf, getTableName());
>>       } catch (Exception e) {
>>         LOG.error("Error instantiating a new HTable instance", e);
>>       }
>>       return null;
>>     }
>> 
>>     @Override
>>     public String getTableName() {
>>       return "bugs:flink";
>>     }
>> 
>>     @Override
>>     protected String mapResultToOutType(Result result) {
>>       return new 
>> String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8)));
>>     }
>> 
>>     @Override
>>     protected Scan getScanner() {
>>       return new Scan();
>>     }
>>   }
>> 
>> }
>> 
>> I run this application with this command on my Yarn cluster (note: first 
>> starting a yarn-cluster and then submitting the job yields the same result).
>> 
>> flink \
>>     run \
>>     -m yarn-cluster \
>>     --yarncontainer 1 \
>>     --yarnname "Flink on Yarn HBase problem" \
>>     --yarnslots                     1     \
>>     --yarnjobManagerMemory          4000  \
>>     --yarntaskManagerMemory         4000  \
>>     --yarnstreaming                       \
>>     target/flink-hbase-connect-1.0-SNAPSHOT.jar
>> 
>> Now in the client side logfile 
>> /usr/local/flink-1.3.2/log/flink--client-80d2d21b10e0.log I see 
>> 1) Classpath actually contains /etc/hbase/conf/ both near the start and at 
>> the end.
>> 2) The zookeeper settings of my experimental environent have been picked up 
>> by the software
>> 2017-10-20 11:17:23,973 INFO  com.bol.bugreports.Main                        
>>                - ----> Loading HBaseConfiguration: Zookeeper = 
>> node1.kluster.local.nl.bol.com:2181 
>> <http://node1.kluster.local.nl.bol.com:2181/>,node2.kluster.local.nl.bol.com:2181
>>  
>> <http://node2.kluster.local.nl.bol.com:2181/>,node3.kluster.local.nl.bol.com:2181
>>  <http://node3.kluster.local.nl.bol.com:2181/>
>> 
>> When I open the logfiles on the Hadoop cluster I see this:
>> 
>> 2017-10-20 13:17:33,250 INFO  com.bol.bugreports.Main                        
>>                - ----> Loading HBaseConfiguration: Zookeeper = localhost
>> 
>> and as a consequence
>> 
>> 2017-10-20 13:17:33,368 INFO  org.apache.zookeeper.ClientCnxn                
>>                - Opening socket connection to server 
>> localhost.localdomain/127.0.0.1:2181 <http://127.0.0.1:2181/>
>> 2017-10-20 13:17:33,369 WARN  org.apache.zookeeper.ClientCnxn                
>>                - Session 0x0 for server null, unexpected error, closing 
>> socket connection and attempting reconnect
>> java.net.ConnectException: Connection refused
>>      at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>      at 
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>>      at 
>> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>>      at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
>> 2017-10-20 13:17:33,475 WARN  
>> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper        - Possibly 
>> transient ZooKeeper, quorum=localhost:2181, 
>> exception=org.apache.zookeeper.KeeperException$ConnectionLossException: 
>> KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
>> 
>> 
>> The value 'localhost:2181' has been defined within the HBase jar in the 
>> hbase-default.xml as the default value for the zookeeper nodes.
>> 
>> As a workaround I currently put this extra line in my code which I know is 
>> nasty but "works on my cluster"
>> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
>> 
>> What am I doing wrong?
>> 
>> What is the right way to fix this?
>> 
>> -- 
>> Best regards / Met vriendelijke groeten,
>> 
>> Niels Basjes
>> 
>> 
>> 
>> -- 
>> Best regards / Met vriendelijke groeten,
>> 
>> Niels Basjes
> 
> 
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes

Reply via email to