[ https://issues.apache.org/jira/browse/FLINK-1828?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14395710#comment-14395710 ]
ASF GitHub Bot commented on FLINK-1828: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/571#discussion_r27768585 --- Diff: flink-staging/flink-hbase/src/test/java/org/apache/flink/addons/hbase/example/HBaseWriteExample.java --- @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.addons.hbase.example; + +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichMapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormat; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.util.Collector; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; + +@SuppressWarnings("serial") +public class HBaseWriteExample { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + + if(!parseParameters(args)) { + return; + } + + // set up the execution environment + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + // get input data + DataSet<String> text = getTextDataSet(env); + + DataSet<Tuple2<String, Integer>> counts = + // split up the lines in pairs (2-tuples) containing: (word,1) + text.flatMap(new Tokenizer()) + // group by the tuple field "0" and sum up tuple field "1" + .groupBy(0) + .sum(1); + + // emit result +// if(fileOutput) { + Job job = Job.getInstance(); + job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName); + // TODO is "mapred.output.dir" really useful? + job.getConfiguration().set("mapred.output.dir","/tmp/test"); + counts.map(new RichMapFunction <Tuple2<String,Integer>, Tuple2<Text,Mutation>>() { + private final byte[] CF_SOME = Bytes.toBytes("test-column"); + private final byte[] Q_SOME = Bytes.toBytes("value"); + private transient Tuple2<Text, Mutation> reuse; + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + reuse = new Tuple2<Text, Mutation>(); + } + + @Override + public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception { + reuse.f0 = new Text(t.f0); + Put put = new Put(t.f0.getBytes()); + put.add(CF_SOME, Q_SOME, Bytes.toBytes(t.f1)); + reuse.f1 = put; + return reuse; + } + }).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job)); +// } else { --- End diff -- `else` branch not necessary > Impossible to output data to an HBase table > ------------------------------------------- > > Key: FLINK-1828 > URL: https://issues.apache.org/jira/browse/FLINK-1828 > Project: Flink > Issue Type: Bug > Components: Hadoop Compatibility > Affects Versions: 0.9 > Reporter: Flavio Pompermaier > Labels: hadoop, hbase > Fix For: 0.9 > > > Right now it is not possible to use HBase TableOutputFormat as output format > because Configurable.setConf is not called in the configure() method of the > HadoopOutputFormatBase -- This message was sent by Atlassian JIRA (v6.3.4#6332)