Till do you have some idea what is going on? I do not see any meaningful 
difference between Niels code and HBaseWriteStreamExample.java. There is also a 
very similar issue on mailing list as well: “Flink can't read hdfs namenode 
logical url” 

Piotrek

> On 22 Oct 2017, at 12:56, Niels Basjes <ni...@basjes.nl> wrote:
> 
> HI,
> 
> Yes, on all nodes the the same /etc/hbase/conf/hbase-site.xml that contains 
> the correct settings for hbase to find zookeeper.
> That is why adding that files as an additional resource to the configuration 
> works.
> I have created a very simple project that reproduces the problem on my setup:
> https://github.com/nielsbasjes/FlinkHBaseConnectProblem 
> <https://github.com/nielsbasjes/FlinkHBaseConnectProblem>
> 
> Niels Basjes
> 
> 
> On Fri, Oct 20, 2017 at 6:54 PM, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> Is this /etc/hbase/conf/hbase-site.xml file is present on all of the 
> machines? If yes, could you share your code?
> 
>> On 20 Oct 2017, at 16:29, Niels Basjes <ni...@basjes.nl 
>> <mailto:ni...@basjes.nl>> wrote:
>> 
>> I look at the logfiles from the Hadoop Yarn webinterface. I.e. actually 
>> looking in the jobmanager.log of the container running the Flink task.
>> That is where I was able to find these messages .
>> 
>> I do the
>>  hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
>> in all places directly after the  HBaseConfiguration.create();
>> That way I simply force the task to look on the actual Hadoop node for the 
>> same file it already loaded locally.
>> 
>> The reason I'm suspecting Flink is because the clientside part of the Flink 
>> application does have the right setting and the task/job actually running in 
>> the cluster does not have the same settings.
>> So it seems in the transition into the cluster the application does not copy 
>> everything it has available locally for some reason.
>> 
>> There is a very high probability I did something wrong, I'm just not seeing 
>> it at this moment.
>> 
>> Niels
>> 
>> 
>> 
>> On Fri, Oct 20, 2017 at 2:53 PM, Piotr Nowojski <pi...@data-artisans.com 
>> <mailto:pi...@data-artisans.com>> wrote:
>> Hi,
>> 
>> What do you mean by saying:
>> 
>>> When I open the logfiles on the Hadoop cluster I see this:
>> 
>> 
>> The error doesn’t come from Flink? Where do you execute 
>> 
>> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
>> 
>> ?
>> 
>> To me it seems like it is a problem with misconfigured HBase and not 
>> something related to Flink.
>> 
>> Piotrek
>> 
>>> On 20 Oct 2017, at 13:44, Niels Basjes <ni...@basjes.nl 
>>> <mailto:ni...@basjes.nl>> wrote:
>>> 
>>> To facilitate you guys helping me I put this test project on github:
>>> https://github.com/nielsbasjes/FlinkHBaseConnectProblem 
>>> <https://github.com/nielsbasjes/FlinkHBaseConnectProblem>
>>> 
>>> Niels Basjes
>>> 
>>> On Fri, Oct 20, 2017 at 1:32 PM, Niels Basjes <ni...@basjes.nl 
>>> <mailto:ni...@basjes.nl>> wrote:
>>> Hi,
>>> 
>>> Ik have a Flink 1.3.2 application that I want to run on a Hadoop yarn 
>>> cluster where I need to connect to HBase.
>>> 
>>> What I have:
>>> 
>>> In my environment:
>>> HADOOP_CONF_DIR=/etc/hadoop/conf/
>>> HBASE_CONF_DIR=/etc/hbase/conf/
>>> HIVE_CONF_DIR=/etc/hive/conf/
>>> YARN_CONF_DIR=/etc/hadoop/conf/
>>> 
>>> In /etc/hbase/conf/hbase-site.xml I have correctly defined the zookeeper 
>>> hosts for HBase.
>>> 
>>> My test code is this:
>>> public class Main {
>>>   private static final Logger LOG = LoggerFactory.getLogger(Main.class);
>>> 
>>>   public static void main(String[] args) throws Exception {
>>>     printZookeeperConfig();
>>>     final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
>>>     env.createInput(new HBaseSource()).print();
>>>     env.execute("HBase config problem");
>>>   }
>>> 
>>>   public static void printZookeeperConfig() {
>>>     String zookeeper = 
>>> HBaseConfiguration.create().get("hbase.zookeeper.quorum");
>>>     LOG.info("----> Loading HBaseConfiguration: Zookeeper = {}", zookeeper);
>>>   }
>>> 
>>>   public static class HBaseSource extends AbstractTableInputFormat<String> {
>>>     @Override
>>>     public void configure(org.apache.flink.configuration.Configuration 
>>> parameters) {
>>>       table = createTable();
>>>       if (table != null) {
>>>         scan = getScanner();
>>>       }
>>>     }
>>> 
>>>     private HTable createTable() {
>>>       LOG.info("Initializing HBaseConfiguration");
>>>       // Uses files found in the classpath
>>>       org.apache.hadoop.conf.Configuration hConf = 
>>> HBaseConfiguration.create();
>>>       printZookeeperConfig();
>>> 
>>>       try {
>>>         return new HTable(hConf, getTableName());
>>>       } catch (Exception e) {
>>>         LOG.error("Error instantiating a new HTable instance", e);
>>>       }
>>>       return null;
>>>     }
>>> 
>>>     @Override
>>>     public String getTableName() {
>>>       return "bugs:flink";
>>>     }
>>> 
>>>     @Override
>>>     protected String mapResultToOutType(Result result) {
>>>       return new 
>>> String(result.getFamilyMap("v".getBytes(UTF_8)).get("column".getBytes(UTF_8)));
>>>     }
>>> 
>>>     @Override
>>>     protected Scan getScanner() {
>>>       return new Scan();
>>>     }
>>>   }
>>> 
>>> }
>>> 
>>> I run this application with this command on my Yarn cluster (note: first 
>>> starting a yarn-cluster and then submitting the job yields the same result).
>>> 
>>> flink \
>>>     run \
>>>     -m yarn-cluster \
>>>     --yarncontainer 1 \
>>>     --yarnname "Flink on Yarn HBase problem" \
>>>     --yarnslots                     1     \
>>>     --yarnjobManagerMemory          4000  \
>>>     --yarntaskManagerMemory         4000  \
>>>     --yarnstreaming                       \
>>>     target/flink-hbase-connect-1.0-SNAPSHOT.jar
>>> 
>>> Now in the client side logfile 
>>> /usr/local/flink-1.3.2/log/flink--client-80d2d21b10e0.log I see 
>>> 1) Classpath actually contains /etc/hbase/conf/ both near the start and at 
>>> the end.
>>> 2) The zookeeper settings of my experimental environent have been picked up 
>>> by the software
>>> 2017-10-20 11:17:23,973 INFO  com.bol.bugreports.Main                       
>>>                 - ----> Loading HBaseConfiguration: Zookeeper = 
>>> node1.kluster.local.nl.bol.com:2181 
>>> <http://node1.kluster.local.nl.bol.com:2181/>,node2.kluster.local.nl.bol.com:2181
>>>  
>>> <http://node2.kluster.local.nl.bol.com:2181/>,node3.kluster.local.nl.bol.com:2181
>>>  <http://node3.kluster.local.nl.bol.com:2181/>
>>> 
>>> When I open the logfiles on the Hadoop cluster I see this:
>>> 
>>> 2017-10-20 13:17:33,250 INFO  com.bol.bugreports.Main                       
>>>                 - ----> Loading HBaseConfiguration: Zookeeper = localhost
>>> 
>>> and as a consequence
>>> 
>>> 2017-10-20 13:17:33,368 INFO  org.apache.zookeeper.ClientCnxn               
>>>                 - Opening socket connection to server 
>>> localhost.localdomain/127.0.0.1:2181 <http://127.0.0.1:2181/>
>>> 2017-10-20 13:17:33,369 WARN  org.apache.zookeeper.ClientCnxn               
>>>                 - Session 0x0 for server null, unexpected error, closing 
>>> socket connection and attempting reconnect
>>> java.net.ConnectException: Connection refused
>>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>     at 
>>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>>>     at 
>>> org.apache.zookeeper.ClientCnxnSocketNIO.doTransport(ClientCnxnSocketNIO.java:361)
>>>     at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1081)
>>> 2017-10-20 13:17:33,475 WARN  
>>> org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper        - Possibly 
>>> transient ZooKeeper, quorum=localhost:2181, 
>>> exception=org.apache.zookeeper.KeeperException$ConnectionLossException: 
>>> KeeperErrorCode = ConnectionLoss for /hbase/hbaseid
>>> 
>>> 
>>> The value 'localhost:2181' has been defined within the HBase jar in the 
>>> hbase-default.xml as the default value for the zookeeper nodes.
>>> 
>>> As a workaround I currently put this extra line in my code which I know is 
>>> nasty but "works on my cluster"
>>> hbaseConfig.addResource(new Path("file:/etc/hbase/conf/hbase-site.xml"));
>>> 
>>> What am I doing wrong?
>>> 
>>> What is the right way to fix this?
>>> 
>>> -- 
>>> Best regards / Met vriendelijke groeten,
>>> 
>>> Niels Basjes
>>> 
>>> 
>>> 
>>> -- 
>>> Best regards / Met vriendelijke groeten,
>>> 
>>> Niels Basjes
>> 
>> 
>> 
>> 
>> -- 
>> Best regards / Met vriendelijke groeten,
>> 
>> Niels Basjes
> 
> 
> 
> 
> -- 
> Best regards / Met vriendelijke groeten,
> 
> Niels Basjes

Reply via email to