[ 
https://issues.apache.org/jira/browse/FLINK-1828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14395710#comment-14395710
 ] 

ASF GitHub Bot commented on FLINK-1828:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/571#discussion_r27768585
  
    --- Diff: 
flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java
 ---
    @@ -0,0 +1,197 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.addons.hbase.example;
    +
    +import org.apache.flink.api.common.functions.FlatMapFunction;
    +import org.apache.flink.api.common.functions.RichMapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.util.Collector;
    +import org.apache.hadoop.hbase.client.Mutation;
    +import org.apache.hadoop.hbase.client.Put;
    +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
    +import org.apache.hadoop.hbase.util.Bytes;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapreduce.Job;
    +
    +@SuppressWarnings("serial")
    +public class HBaseWriteExample {
    +   
    +   // 
*************************************************************************
    +   //     PROGRAM
    +   // 
*************************************************************************
    +   
    +   public static void main(String[] args) throws Exception {
    +
    +           if(!parseParameters(args)) {
    +                   return;
    +           }
    +           
    +           // set up the execution environment
    +           final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
    +           
    +           // get input data
    +           DataSet<String> text = getTextDataSet(env);
    +           
    +           DataSet<Tuple2<String, Integer>> counts = 
    +                           // split up the lines in pairs (2-tuples) 
containing: (word,1)
    +                           text.flatMap(new Tokenizer())
    +                           // group by the tuple field "0" and sum up 
tuple field "1"
    +                           .groupBy(0)
    +                           .sum(1);
    +
    +           // emit result
    +//         if(fileOutput) {
    +                   Job job = Job.getInstance();
    +                   
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName);
    +                   // TODO is "mapred.output.dir" really useful?
    +                   
job.getConfiguration().set("mapred.output.dir","/tmp/test");
    +                   counts.map(new RichMapFunction <Tuple2<String,Integer>, 
Tuple2<Text,Mutation>>() {
    +                           private final byte[] CF_SOME = 
Bytes.toBytes("test-column");
    +                           private final byte[] Q_SOME = 
Bytes.toBytes("value");
    +                           private transient Tuple2<Text, Mutation> reuse;
    +
    +                           @Override
    +                           public void open(Configuration parameters) 
throws Exception {
    +                                   super.open(parameters);
    +                                   reuse = new Tuple2<Text, Mutation>();
    +                           }
    +
    +                           @Override
    +                           public Tuple2<Text, Mutation> 
map(Tuple2<String, Integer> t) throws Exception {
    +                                   reuse.f0 = new Text(t.f0);
    +                                   Put put = new Put(t.f0.getBytes());
    +                                   put.add(CF_SOME, Q_SOME, 
Bytes.toBytes(t.f1));
    +                                   reuse.f1 = put;
    +                                   return reuse;
    +                           }
    +                   }).output(new HadoopOutputFormat<Text, Mutation>(new 
TableOutputFormat<Text>(), job));
    +//         } else {
    --- End diff --
    
    `else` branch not necessary


> Impossible to output data to an HBase table
> -------------------------------------------
>
>                 Key: FLINK-1828
>                 URL: https://issues.apache.org/jira/browse/FLINK-1828
>             Project: Flink
>          Issue Type: Bug
>          Components: Hadoop Compatibility
>    Affects Versions: 0.9
>            Reporter: Flavio Pompermaier
>              Labels: hadoop, hbase
>             Fix For: 0.9
>
>
> Right now it is not possible to use HBase TableOutputFormat as output format 
> because Configurable.setConf  is not called in the configure() method of the 
> HadoopOutputFormatBase



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to