[FLINK-2900] [hadoop-compat] Remove Record API code from Hadoop Compat module
This closes #1293 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ff04baa Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ff04baa Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ff04baa Branch: refs/heads/master Commit: 2ff04baa649831fe66a7fa1f3d5b4d7adf632261 Parents: a8f1be9 Author: Fabian Hueske <fhue...@apache.org> Authored: Thu Oct 22 21:12:15 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Thu Oct 29 10:33:55 2015 +0100 ---------------------------------------------------------------------- .../mapred/record/HadoopDataSink.java | 98 ---------- .../mapred/record/HadoopDataSource.java | 82 -------- .../mapred/record/HadoopRecordInputFormat.java | 174 ---------------- .../mapred/record/HadoopRecordOutputFormat.java | 156 --------------- .../datatypes/DefaultFlinkTypeConverter.java | 95 --------- .../datatypes/DefaultHadoopTypeConverter.java | 83 -------- .../record/datatypes/FlinkTypeConverter.java | 43 ---- .../datatypes/HadoopFileOutputCommitter.java | 196 ------------------- .../record/datatypes/HadoopTypeConverter.java | 42 ---- .../datatypes/WritableComparableWrapper.java | 40 ---- .../record/datatypes/WritableWrapper.java | 71 ------- .../datatypes/WritableWrapperConverter.java | 45 ----- .../mapred/record/example/WordCount.java | 184 ----------------- .../example/WordCountWithOutputFormat.java | 173 ---------------- .../record/HadoopRecordInputOutputITCase.java | 54 ----- 15 files changed, 1536 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java deleted file mode 100644 index 3b8064f..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSink.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred.record; - -import java.util.Collections; -import java.util.List; - -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.java.record.operators.GenericDataSink; -import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter; -import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter; -import org.apache.flink.types.Record; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.OutputFormat; - -/** - * The HadoopDataSink is a generic wrapper for all Hadoop OutputFormats. - * - * Example usage: - * <pre> - * HadoopDataSink out = new HadoopDataSink(new org.apache.hadoop.mapred.TextOutputFormat<Text, IntWritable>(), new JobConf(), "Hadoop TextOutputFormat",reducer, Text.class,IntWritable.class); - * org.apache.hadoop.mapred.TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output)); - * </pre> - * - * Note that it is possible to provide custom data type converter. - * - * The HadoopDataSink provides a default converter: {@link org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultFlinkTypeConverter} - **/ -public class HadoopDataSink<K,V> extends GenericDataSink { - - private static String DEFAULT_NAME = "<Unnamed Hadoop Data Sink>"; - - private JobConf jobConf; - - public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) { - this(hadoopFormat, jobConf, name, Collections.<Operator<Record>>singletonList(input), conv, keyClass, valueClass); - } - - public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) { - this(hadoopFormat, jobConf, name, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass); - } - - public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) { - this(hadoopFormat, jobConf, DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass); - } - - public HadoopDataSink(OutputFormat<K,V> hadoopFormat, Operator<Record> input, Class<K> keyClass, Class<V> valueClass) { - this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass); - } - - - - @SuppressWarnings("deprecation") - public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, FlinkTypeConverter<K,V> conv, Class<K> keyClass, Class<V> valueClass) { - super(new HadoopRecordOutputFormat<K,V>(hadoopFormat, jobConf, conv),input, name); - - if (hadoopFormat == null || jobConf == null) { - throw new NullPointerException(); - } - - this.name = name; - this.jobConf = jobConf; - jobConf.setOutputKeyClass(keyClass); - jobConf.setOutputValueClass(valueClass); - } - - public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, String name, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) { - this(hadoopFormat, jobConf, name, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass); - } - - public HadoopDataSink(OutputFormat<K,V> hadoopFormat, JobConf jobConf, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) { - this(hadoopFormat, jobConf, DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass); - } - - public HadoopDataSink(OutputFormat<K,V> hadoopFormat, List<Operator<Record>> input, Class<K> keyClass, Class<V> valueClass) { - this(hadoopFormat, new JobConf(), DEFAULT_NAME, input, new DefaultFlinkTypeConverter<K, V>(keyClass, valueClass), keyClass, valueClass); - } - - public JobConf getJobConf() { - return this.jobConf; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java deleted file mode 100644 index 508f069..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopDataSource.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred.record; - - -import org.apache.flink.api.java.record.operators.GenericDataSource; -import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.DefaultHadoopTypeConverter; -import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter; -import org.apache.hadoop.mapred.InputFormat; -import org.apache.hadoop.mapred.JobConf; - -/** - * The HadoopDataSource is a generic wrapper for all Hadoop InputFormats. - * - * Example usage: - * <pre> - * HadoopDataSource source = new HadoopDataSource(new org.apache.hadoop.mapred.TextInputFormat(), new JobConf(), "Input Lines"); - * org.apache.hadoop.mapred.TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput)); - * </pre> - * - * Note that it is possible to provide custom data type converter. - * - * The HadoopDataSource provides two different standard converters: - * * WritableWrapperConverter: Converts Hadoop Types to a record that contains a WritableComparableWrapper (key) and a WritableWrapper - * * DefaultHadoopTypeConverter: Converts the standard hadoop types (longWritable, Text) to Flinks's {@link org.apache.flink.types.Value} types. - */ -public class HadoopDataSource<K,V> extends GenericDataSource<HadoopRecordInputFormat<K,V>> { - - private static String DEFAULT_NAME = "<Unnamed Hadoop Data Source>"; - - private JobConf jobConf; - - /** - * - * @param hadoopFormat Implementation of a Hadoop input format - * @param jobConf JobConf object (Hadoop) - * @param name Name of the DataSource - * @param conv Definition of a custom type converter {@link DefaultHadoopTypeConverter}. - */ - public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name, HadoopTypeConverter<K,V> conv) { - super(new HadoopRecordInputFormat<K,V>(hadoopFormat, jobConf, conv),name); - - if (hadoopFormat == null || jobConf == null || conv == null) { - throw new NullPointerException(); - } - - this.name = name; - this.jobConf = jobConf; - } - - public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf, String name) { - this(hadoopFormat, jobConf, name, new DefaultHadoopTypeConverter<K,V>() ); - } - public HadoopDataSource(InputFormat<K,V> hadoopFormat, JobConf jobConf) { - this(hadoopFormat, jobConf, DEFAULT_NAME); - } - - public HadoopDataSource(InputFormat<K,V> hadoopFormat) { - this(hadoopFormat, new JobConf(), DEFAULT_NAME); - } - - public JobConf getJobConf() { - return this.jobConf; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java deleted file mode 100644 index edcc43b..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.record; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import org.apache.flink.api.common.io.DefaultInputSplitAssigner; -import org.apache.flink.api.common.io.InputFormat; -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.InputSplitAssigner; -import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter; -import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; -import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopInputSplit; -import org.apache.flink.types.Record; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordReader; -import org.apache.hadoop.util.ReflectionUtils; - -public class HadoopRecordInputFormat<K, V> implements InputFormat<Record, HadoopInputSplit> { - - private static final long serialVersionUID = 1L; - - public org.apache.hadoop.mapred.InputFormat<K, V> hadoopInputFormat; - public HadoopTypeConverter<K,V> converter; - private String hadoopInputFormatName; - public JobConf jobConf; - public transient K key; - public transient V value; - public RecordReader<K, V> recordReader; - private boolean fetched = false; - private boolean hasNext; - - public HadoopRecordInputFormat() { - super(); - } - - public HadoopRecordInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat, JobConf job, HadoopTypeConverter<K,V> conv) { - super(); - this.hadoopInputFormat = hadoopInputFormat; - this.hadoopInputFormatName = hadoopInputFormat.getClass().getName(); - this.converter = conv; - HadoopUtils.mergeHadoopConf(job); - this.jobConf = job; - } - - @Override - public void configure(Configuration parameters) { - - } - - @Override - public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException { - return null; - } - - @Override - public HadoopInputSplit[] createInputSplits(int minNumSplits) - throws IOException { - org.apache.hadoop.mapred.InputSplit[] splitArray = hadoopInputFormat.getSplits(jobConf, minNumSplits); - HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length]; - for(int i=0;i<splitArray.length;i++){ - hiSplit[i] = new HadoopInputSplit(i, splitArray[i], jobConf); - } - return hiSplit; - } - - @Override - public InputSplitAssigner getInputSplitAssigner(HadoopInputSplit[] inputSplits) { - return new DefaultInputSplitAssigner(inputSplits); - } - - @Override - public void open(HadoopInputSplit split) throws IOException { - this.recordReader = this.hadoopInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter()); - key = this.recordReader.createKey(); - value = this.recordReader.createValue(); - this.fetched = false; - } - - private void fetchNext() throws IOException { - hasNext = this.recordReader.next(key, value); - fetched = true; - } - - @Override - public boolean reachedEnd() throws IOException { - if(!fetched) { - fetchNext(); - } - return !hasNext; - } - - @Override - public Record nextRecord(Record record) throws IOException { - if(!fetched) { - fetchNext(); - } - if(!hasNext) { - return null; - } - converter.convert(record, key, value); - fetched = false; - return record; - } - - @Override - public void close() throws IOException { - this.recordReader.close(); - } - - /** - * Custom serialization methods. - * @see "http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html" - */ - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(hadoopInputFormatName); - jobConf.write(out); - out.writeObject(converter); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - hadoopInputFormatName = in.readUTF(); - if(jobConf == null) { - jobConf = new JobConf(); - } - jobConf.readFields(in); - try { - this.hadoopInputFormat = (org.apache.hadoop.mapred.InputFormat<K,V>) Class.forName(this.hadoopInputFormatName).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate the hadoop input format", e); - } - ReflectionUtils.setConf(hadoopInputFormat, jobConf); - converter = (HadoopTypeConverter<K,V>) in.readObject(); - } - - public void setJobConf(JobConf job) { - this.jobConf = job; - } - - - public org.apache.hadoop.mapred.InputFormat<K,V> getHadoopInputFormat() { - return hadoopInputFormat; - } - - public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat<K,V> hadoopInputFormat) { - this.hadoopInputFormat = hadoopInputFormat; - } - - public JobConf getJobConf() { - return jobConf; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java deleted file mode 100644 index e519062..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.record; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; - -import org.apache.flink.api.common.io.OutputFormat; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.HadoopFileOutputCommitter; -import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.FlinkTypeConverter; -import org.apache.flink.api.java.hadoop.mapred.utils.HadoopUtils; -import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyProgressable; -import org.apache.flink.api.java.hadoop.mapred.wrapper.HadoopDummyReporter; -import org.apache.flink.types.Record; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.RecordWriter; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.util.ReflectionUtils; - - -public class HadoopRecordOutputFormat<K,V> implements OutputFormat<Record> { - - private static final long serialVersionUID = 1L; - - public JobConf jobConf; - - public org.apache.hadoop.mapred.OutputFormat<K,V> hadoopOutputFormat; - - private String hadoopOutputFormatName; - - public RecordWriter<K,V> recordWriter; - - public FlinkTypeConverter<K,V> converter; - - public HadoopFileOutputCommitter fileOutputCommitterWrapper; - - public HadoopRecordOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> hadoopFormat, JobConf job, FlinkTypeConverter<K,V> conv) { - super(); - this.hadoopOutputFormat = hadoopFormat; - this.hadoopOutputFormatName = hadoopFormat.getClass().getName(); - this.converter = conv; - this.fileOutputCommitterWrapper = new HadoopFileOutputCommitter(); - HadoopUtils.mergeHadoopConf(job); - this.jobConf = job; - } - - @Override - public void configure(Configuration parameters) { - } - - /** - * create the temporary output file for hadoop RecordWriter. - * @param taskNumber The number of the parallel instance. - * @param numTasks The number of parallel tasks. - * @throws IOException - */ - @Override - public void open(int taskNumber, int numTasks) throws IOException { - this.fileOutputCommitterWrapper.setupJob(this.jobConf); - if (Integer.toString(taskNumber + 1).length() <= 6) { - this.jobConf.set("mapred.task.id", "attempt__0000_r_" + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") + Integer.toString(taskNumber + 1) + "_0"); - //compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1 - this.jobConf.set("mapreduce.task.output.dir", this.fileOutputCommitterWrapper.getTempTaskOutputPath(this.jobConf,TaskAttemptID.forName(this.jobConf.get("mapred.task.id"))).toString()); - } else { - throw new IOException("task id too large"); - } - this.recordWriter = this.hadoopOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable()); - } - - - @Override - public void writeRecord(Record record) throws IOException { - K key = this.converter.convertKey(record); - V value = this.converter.convertValue(record); - this.recordWriter.write(key, value); - } - - /** - * commit the task by moving the output file out from the temporary directory. - * @throws IOException - */ - @Override - public void close() throws IOException { - this.recordWriter.close(new HadoopDummyReporter()); - if (this.fileOutputCommitterWrapper.needsTaskCommit(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id")))) { - this.fileOutputCommitterWrapper.commitTask(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id"))); - } - //TODO: commitjob when all the tasks are finished - } - - - /** - * Custom serialization methods. - * @see "http://docs.oracle.com/javase/7/docs/api/java/io/Serializable.html" - */ - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeUTF(hadoopOutputFormatName); - jobConf.write(out); - out.writeObject(converter); - out.writeObject(fileOutputCommitterWrapper); - } - - @SuppressWarnings("unchecked") - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - hadoopOutputFormatName = in.readUTF(); - if(jobConf == null) { - jobConf = new JobConf(); - } - jobConf.readFields(in); - try { - this.hadoopOutputFormat = (org.apache.hadoop.mapred.OutputFormat<K,V>) Class.forName(this.hadoopOutputFormatName).newInstance(); - } catch (Exception e) { - throw new RuntimeException("Unable to instantiate the hadoop output format", e); - } - ReflectionUtils.setConf(hadoopOutputFormat, jobConf); - converter = (FlinkTypeConverter<K,V>) in.readObject(); - fileOutputCommitterWrapper = (HadoopFileOutputCommitter) in.readObject(); - } - - - public void setJobConf(JobConf job) { - this.jobConf = job; - } - - public JobConf getJobConf() { - return jobConf; - } - - public org.apache.hadoop.mapred.OutputFormat<K,V> getHadoopOutputFormat() { - return hadoopOutputFormat; - } - - public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat<K,V> hadoopOutputFormat) { - this.hadoopOutputFormat = hadoopOutputFormat; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java deleted file mode 100644 index 9d37988..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultFlinkTypeConverter.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.record.datatypes; - -import org.apache.flink.types.BooleanValue; -import org.apache.flink.types.ByteValue; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.FloatValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; - -/** - * Convert Flink Record into the default hadoop writables. - */ -public class DefaultFlinkTypeConverter<K,V> implements FlinkTypeConverter<K,V> { - private static final long serialVersionUID = 1L; - - private Class<K> keyClass; - private Class<V> valueClass; - - public DefaultFlinkTypeConverter(Class<K> keyClass, Class<V> valueClass) { - this.keyClass= keyClass; - this.valueClass = valueClass; - } - @Override - public K convertKey(Record flinkRecord) { - if(flinkRecord.getNumFields() > 0) { - return convert(flinkRecord, 0, this.keyClass); - } else { - return null; - } - } - - @Override - public V convertValue(Record flinkRecord) { - if(flinkRecord.getNumFields() > 1) { - return convert(flinkRecord, 1, this.valueClass); - } else { - return null; - } - } - - @SuppressWarnings("unchecked") - private<T> T convert(Record flinkType, int pos, Class<T> hadoopType) { - if(hadoopType == LongWritable.class ) { - return (T) new LongWritable((flinkType.getField(pos, LongValue.class)).getValue()); - } - if(hadoopType == org.apache.hadoop.io.Text.class) { - return (T) new Text((flinkType.getField(pos, StringValue.class)).getValue()); - } - if(hadoopType == org.apache.hadoop.io.IntWritable.class) { - return (T) new IntWritable((flinkType.getField(pos, IntValue.class)).getValue()); - } - if(hadoopType == org.apache.hadoop.io.FloatWritable.class) { - return (T) new FloatWritable((flinkType.getField(pos, FloatValue.class)).getValue()); - } - if(hadoopType == org.apache.hadoop.io.DoubleWritable.class) { - return (T) new DoubleWritable((flinkType.getField(pos, DoubleValue.class)).getValue()); - } - if(hadoopType == org.apache.hadoop.io.BooleanWritable.class) { - return (T) new BooleanWritable((flinkType.getField(pos, BooleanValue.class)).getValue()); - } - if(hadoopType == org.apache.hadoop.io.ByteWritable.class) { - return (T) new ByteWritable((flinkType.getField(pos, ByteValue.class)).getValue()); - } - - throw new RuntimeException("Unable to convert Flink type ("+flinkType.getClass().getCanonicalName()+") to Hadoop."); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java deleted file mode 100644 index 6ed670a..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.record.datatypes; - -import org.apache.flink.types.BooleanValue; -import org.apache.flink.types.ByteValue; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.FloatValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.NullValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.types.Value; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Text; - - -/** - * Converter for the default hadoop writables. - * Key will be in field 0, Value in field 1 of a Record. - */ -public class DefaultHadoopTypeConverter<K, V> implements HadoopTypeConverter<K, V> { - private static final long serialVersionUID = 1L; - - @Override - public void convert(Record flinkRecord, K hadoopKey, V hadoopValue) { - flinkRecord.setField(0, convert(hadoopKey)); - flinkRecord.setField(1, convert(hadoopValue)); - } - - protected Value convert(Object hadoopType) { - if(hadoopType instanceof org.apache.hadoop.io.LongWritable ) { - return new LongValue(((LongWritable)hadoopType).get()); - } - if(hadoopType instanceof org.apache.hadoop.io.Text) { - return new StringValue(((Text)hadoopType).toString()); - } - if(hadoopType instanceof org.apache.hadoop.io.IntWritable) { - return new IntValue(((IntWritable)hadoopType).get()); - } - if(hadoopType instanceof org.apache.hadoop.io.FloatWritable) { - return new FloatValue(((FloatWritable)hadoopType).get()); - } - if(hadoopType instanceof org.apache.hadoop.io.DoubleWritable) { - return new DoubleValue(((DoubleWritable)hadoopType).get()); - } - if(hadoopType instanceof org.apache.hadoop.io.BooleanWritable) { - return new BooleanValue(((BooleanWritable)hadoopType).get()); - } - if(hadoopType instanceof org.apache.hadoop.io.ByteWritable) { - return new ByteValue(((ByteWritable)hadoopType).get()); - } - if (hadoopType instanceof NullWritable) { - return NullValue.getInstance(); - } - - throw new RuntimeException("Unable to convert Hadoop type ("+hadoopType.getClass().getCanonicalName()+") to a Flink data type."); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java deleted file mode 100644 index 3c14a86..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/FlinkTypeConverter.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.record.datatypes; - -import java.io.Serializable; - -import org.apache.flink.types.Record; - -/** - * An interface describing a class that is able to - * convert Flink's Record into Hadoop types model. - * - * The converter must be Serializable. - * - * Flink provides a DefaultFlinkTypeConverter. Custom implementations should - * chain the type converters. - */ -public interface FlinkTypeConverter<K,V> extends Serializable { - - /** - * Convert a Flink type to a Hadoop type. - */ - public K convertKey(Record record); - - public V convertValue(Record record); -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java deleted file mode 100644 index ce4955c..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java +++ /dev/null @@ -1,196 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.record.datatypes; - -import java.io.IOException; -import java.io.Serializable; -import java.net.URI; - -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.FileOutputCommitter; -import org.apache.hadoop.mapred.FileOutputFormat; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.TaskAttemptID; -import org.apache.hadoop.util.StringUtils; - -/** - * Hadoop 1.2.1 {@link org.apache.hadoop.mapred.FileOutputCommitter} takes {@link org.apache.hadoop.mapred.JobContext} - * as input parameter. However JobContext class is package private, and in Hadoop 2.2.0 it's public. - * This class takes {@link org.apache.hadoop.mapred.JobConf} as input instead of JobContext in order to setup and commit tasks. - */ -public class HadoopFileOutputCommitter extends FileOutputCommitter implements Serializable { - - private static final long serialVersionUID = 1L; - - static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = - "mapreduce.fileoutputcommitter.marksuccessfuljobs"; - - public void setupJob(JobConf conf) throws IOException { - Path outputPath = FileOutputFormat.getOutputPath(conf); - if (outputPath != null) { - Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fileSys = tmpDir.getFileSystem(conf); - if (!fileSys.mkdirs(tmpDir)) { - LOG.error("Mkdirs failed to create " + tmpDir.toString()); - } - } - } - - private static boolean getOutputDirMarking(JobConf conf) { - return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true); - } - - private void markSuccessfulOutputDir(JobConf conf) - throws IOException { - Path outputPath = FileOutputFormat.getOutputPath(conf); - if (outputPath != null) { - FileSystem fileSys = outputPath.getFileSystem(conf); - // create a file in the folder to mark it - if (fileSys.exists(outputPath)) { - Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME); - fileSys.create(filePath).close(); - } - } - } - - private Path getFinalPath(Path jobOutputDir, Path taskOutput, - Path taskOutputPath) throws IOException { - URI taskOutputUri = taskOutput.toUri(); - URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri); - if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput - throw new IOException("Can not get the relative path: base = " + - taskOutputPath + " child = " + taskOutput); - } - if (relativePath.getPath().length() > 0) { - return new Path(jobOutputDir, relativePath.getPath()); - } else { - return jobOutputDir; - } - } - private void moveTaskOutputs(JobConf conf, TaskAttemptID taskAttemptID, - FileSystem fs, - Path jobOutputDir, - Path taskOutput) - throws IOException { - if (fs.isFile(taskOutput)) { - Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, - getTempTaskOutputPath(conf, taskAttemptID)); - if (!fs.rename(taskOutput, finalOutputPath)) { - if (!fs.delete(finalOutputPath, true)) { - throw new IOException("Failed to delete earlier output of task: " + - taskAttemptID); - } - if (!fs.rename(taskOutput, finalOutputPath)) { - throw new IOException("Failed to save output of task: " + - taskAttemptID); - } - } - LOG.debug("Moved " + taskOutput + " to " + finalOutputPath); - } else if(fs.getFileStatus(taskOutput).isDir()) { - FileStatus[] paths = fs.listStatus(taskOutput); - Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, - getTempTaskOutputPath(conf, taskAttemptID)); - fs.mkdirs(finalOutputPath); - if (paths != null) { - for (FileStatus path : paths) { - moveTaskOutputs(conf,taskAttemptID, fs, jobOutputDir, path.getPath()); - } - } - } - } - - public void commitTask(JobConf conf, TaskAttemptID taskAttemptID) - throws IOException { - Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID); - if (taskOutputPath != null) { - FileSystem fs = taskOutputPath.getFileSystem(conf); - if (fs.exists(taskOutputPath)) { - Path jobOutputPath = taskOutputPath.getParent().getParent(); - // Move the task outputs to their final place - moveTaskOutputs(conf,taskAttemptID, fs, jobOutputPath, taskOutputPath); - // Delete the temporary task-specific output directory - if (!fs.delete(taskOutputPath, true)) { - LOG.info("Failed to delete the temporary output" + - " directory of task: " + taskAttemptID + " - " + taskOutputPath); - } - LOG.info("Saved output of task '" + taskAttemptID + "' to " + - jobOutputPath); - } - } - } - public boolean needsTaskCommit(JobConf conf, TaskAttemptID taskAttemptID) - throws IOException { - try { - Path taskOutputPath = getTempTaskOutputPath(conf, taskAttemptID); - if (taskOutputPath != null) { - // Get the file-system for the task output directory - FileSystem fs = taskOutputPath.getFileSystem(conf); - // since task output path is created on demand, - // if it exists, task needs a commit - if (fs.exists(taskOutputPath)) { - return true; - } - } - } catch (IOException ioe) { - throw ioe; - } - return false; - } - - public Path getTempTaskOutputPath(JobConf conf, TaskAttemptID taskAttemptID) { - Path outputPath = FileOutputFormat.getOutputPath(conf); - if (outputPath != null) { - Path p = new Path(outputPath, - (FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + - "_" + taskAttemptID.toString())); - try { - FileSystem fs = p.getFileSystem(conf); - return p.makeQualified(fs); - } catch (IOException ie) { - LOG.warn(StringUtils.stringifyException(ie)); - return p; - } - } - return null; - } - public void cleanupJob(JobConf conf) throws IOException { - // do the clean up of temporary directory - Path outputPath = FileOutputFormat.getOutputPath(conf); - if (outputPath != null) { - Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME); - FileSystem fileSys = tmpDir.getFileSystem(conf); - if (fileSys.exists(tmpDir)) { - fileSys.delete(tmpDir, true); - } - } else { - LOG.warn("Output path is null in cleanup"); - } - } - - public void commitJob(JobConf conf) throws IOException { - cleanupJob(conf); - if (getOutputDirMarking(conf)) { - markSuccessfulOutputDir(conf); - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java deleted file mode 100644 index 6bbf077..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.record.datatypes; - -import java.io.Serializable; - -import org.apache.flink.types.Record; - - -/** - * An interface describing a class that is able to - * convert Hadoop types into Flink's Record model. - * - * The converter must be Serializable. - * - * Flink provides a DefaultHadoopTypeConverter. Custom implementations should - * chain the type converters. - */ -public interface HadoopTypeConverter<K, V> extends Serializable { - - /** - * Convert a Hadoop type to a Flink type. - */ - public void convert(Record record, K hadoopKey, V hadoopValue); -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java deleted file mode 100644 index 0519ac9..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.record.datatypes; - -import org.apache.flink.types.Key; -import org.apache.hadoop.io.WritableComparable; - -public class WritableComparableWrapper<T extends WritableComparable<T>> extends WritableWrapper<T> implements Key<WritableComparableWrapper<T>> { - private static final long serialVersionUID = 1L; - - public WritableComparableWrapper() { - super(); - } - - public WritableComparableWrapper(T toWrap) { - super(toWrap); - } - - @Override - public int compareTo(WritableComparableWrapper<T> o) { - return super.value().compareTo(o.value()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java deleted file mode 100644 index 6369bb7..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.record.datatypes; - -import java.io.IOException; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.types.Value; -import org.apache.flink.util.InstantiationUtil; -import org.apache.hadoop.io.Writable; - -public class WritableWrapper<T extends Writable> implements Value { - private static final long serialVersionUID = 2L; - - private T wrapped; - private String wrappedType; - private ClassLoader cl; - - public WritableWrapper() { - } - - public WritableWrapper(T toWrap) { - wrapped = toWrap; - wrappedType = toWrap.getClass().getCanonicalName(); - } - - public T value() { - return wrapped; - } - - @Override - public void write(DataOutputView out) throws IOException { - out.writeUTF(wrappedType); - wrapped.write(out); - } - - @Override - public void read(DataInputView in) throws IOException { - if(cl == null) { - cl = Thread.currentThread().getContextClassLoader(); - } - wrappedType = in.readUTF(); - try { - @SuppressWarnings("unchecked") - Class<T> wrClass = (Class<T>) Class.forName(wrappedType, true, cl).asSubclass(Writable.class); - wrapped = InstantiationUtil.instantiate(wrClass, Writable.class); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Error creating the WritableWrapper", e); - } - wrapped.readFields(in); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java deleted file mode 100644 index 6bb13d1..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.hadoopcompatibility.mapred.record.datatypes; - -import org.apache.flink.types.Record; -import org.apache.flink.types.Value; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; - -@SuppressWarnings("rawtypes") -public class WritableWrapperConverter<K extends WritableComparable, V extends Writable> implements HadoopTypeConverter<K,V> { - private static final long serialVersionUID = 1L; - - @Override - public void convert(Record flinkRecord, K hadoopKey, V hadoopValue) { - flinkRecord.setField(0, convertKey(hadoopKey)); - flinkRecord.setField(1, convertValue(hadoopValue)); - } - - @SuppressWarnings("unchecked") - private final Value convertKey(K in) { - return new WritableComparableWrapper(in); - } - - private final Value convertValue(V in) { - return new WritableWrapper<V>(in); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java deleted file mode 100644 index b167080..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCount.java +++ /dev/null @@ -1,184 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred.record.example; - -import java.io.Serializable; -import java.util.Iterator; -import java.util.StringTokenizer; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields; -import org.apache.flink.api.java.record.io.CsvOutputFormat; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; -import org.apache.flink.client.LocalExecutor; -import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource; -import org.apache.flink.hadoopcompatibility.mapred.record.datatypes.WritableWrapperConverter; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.util.Collector; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.TextInputFormat; - -/** - * Implements a word count which takes the input file and counts the number of - * the occurrences of each word in the file. - * - * <br /><br /> - * - * <b>Note</b>: This example uses the out-dated Record API. - * It is recommended to use the new Java API. - * - * @see org.apache.flink.hadoopcompatibility.mapred.example.HadoopMapredCompatWordCount - */ -public class WordCount implements Program, ProgramDescription { - - private static final long serialVersionUID = 1L; - - - /** - * Converts a Record containing one string in to multiple string/integer pairs. - * The string is tokenized by whitespaces. For each token a new record is emitted, - * where the token is the first field and an Integer(1) is the second field. - */ - public static class TokenizeLine extends MapFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void map(Record record, Collector<Record> collector) { - // get the first field (as type StringValue) from the record - String line = record.getField(1, StringValue.class).getValue(); - // normalize the line - line = line.replaceAll("\\W+", " ").toLowerCase(); - - // tokenize the line - StringTokenizer tokenizer = new StringTokenizer(line); - while (tokenizer.hasMoreTokens()) { - String word = tokenizer.nextToken(); - - // we emit a (word, 1) pair - collector.collect(new Record(new StringValue(word), new IntValue(1))); - } - } - } - - /** - * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code> - * in the record. The other fields are not modified. - */ - @Combinable - @ConstantFields(0) - public static class CountWords extends ReduceFunction implements Serializable { - - private static final long serialVersionUID = 1L; - - @Override - public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception { - Record element = null; - int sum = 0; - - while (records.hasNext()) { - element = records.next(); - int cnt = element.getField(1, IntValue.class).getValue(); - sum += cnt; - } - - element.setField(1, new IntValue(sum)); - out.collect(element); - } - - @Override - public void combine(Iterator<Record> records, Collector<Record> out) throws Exception { - // the logic is the same as in the reduce function, so simply call the reduce method - reduce(records, out); - } - } - - - @SuppressWarnings({ "rawtypes", "unchecked", "unused" }) - @Override - public Plan getPlan(String... args) { - // parse job parameters - int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1); - String dataInput = (args.length > 1 ? args[1] : ""); - String output = (args.length > 2 ? args[2] : ""); - - - HadoopDataSource source = new HadoopDataSource(new TextInputFormat(), new JobConf(), "Input Lines"); - TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput)); - - // Example with Wrapper Converter - HadoopDataSource<LongWritable,Text> sourceHadoopType = new HadoopDataSource<LongWritable, Text>( - new TextInputFormat(), new JobConf(), "Input Lines", new WritableWrapperConverter<LongWritable, Text>()); - TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput)); - - MapOperator mapper = MapOperator.builder(new TokenizeLine()) - .input(source) - .name("Tokenize Lines") - .build(); - ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0) - .input(mapper) - .name("Count Words") - .build(); - FileDataSink out = new FileDataSink(new CsvOutputFormat(), output, reducer, "Word Counts"); - CsvOutputFormat.configureRecordFormat(out) - .recordDelimiter('\n') - .fieldDelimiter(' ') - .field(StringValue.class, 0) - .field(IntValue.class, 1); - - Plan plan = new Plan(out, "WordCount Example"); - plan.setDefaultParallelism(numSubTasks); - return plan; - } - - - @Override - public String getDescription() { - return "Parameters: [numSubStasks] [input] [output]"; - } - - - public static void main(String[] args) throws Exception { - WordCount wc = new WordCount(); - - if (args.length < 3) { - System.err.println(wc.getDescription()); - System.exit(1); - } - - Plan plan = wc.getPlan(args); - - // This will execute the word-count embedded in a local context. replace this line by the commented - // succeeding line to send the job to a local installation or to a cluster for execution - LocalExecutor.execute(plan); -// PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar"); -// ex.executePlan(plan); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java b/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java deleted file mode 100644 index a838215..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/main/java/org/apache/flink/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.hadoopcompatibility.mapred.record.example; - -import java.io.Serializable; -import java.util.Iterator; -import java.util.StringTokenizer; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.api.common.Program; -import org.apache.flink.api.common.ProgramDescription; -import org.apache.flink.api.java.record.functions.MapFunction; -import org.apache.flink.api.java.record.functions.ReduceFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFields; -import org.apache.flink.api.java.record.operators.MapOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator; -import org.apache.flink.api.java.record.operators.ReduceOperator.Combinable; -import org.apache.flink.client.LocalExecutor; -import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSink; -import org.apache.flink.hadoopcompatibility.mapred.record.HadoopDataSource; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; -import org.apache.flink.util.Collector; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.TextInputFormat; -import org.apache.hadoop.mapred.TextOutputFormat; - -/** - * Implements a word count which takes the input file and counts the number of - * the occurrences of each word in the file. - * - * <br /><br /> - * - * <b>Note</b>: This example uses the out dated Record API. - * It is recommended to use the new Java API. - * - * @see WordCount - */ -@SuppressWarnings("serial") -public class WordCountWithOutputFormat implements Program, ProgramDescription { - - /** - * Converts a Record containing one string in to multiple string/integer pairs. - * The string is tokenized by whitespaces. For each token a new record is emitted, - * where the token is the first field and an Integer(1) is the second field. - */ - public static class TokenizeLine extends MapFunction implements Serializable { - private static final long serialVersionUID = 1L; - - @Override - public void map(Record record, Collector<Record> collector) { - // get the first field (as type StringValue) from the record - String line = record.getField(1, StringValue.class).getValue(); - // normalize the line - line = line.replaceAll("\\W+", " ").toLowerCase(); - - // tokenize the line - StringTokenizer tokenizer = new StringTokenizer(line); - while (tokenizer.hasMoreTokens()) { - String word = tokenizer.nextToken(); - - // we emit a (word, 1) pair - collector.collect(new Record(new StringValue(word), new IntValue(1))); - } - } - } - - /** - * Sums up the counts for a certain given key. The counts are assumed to be at position <code>1</code> - * in the record. The other fields are not modified. - */ - @Combinable - @ConstantFields(0) - public static class CountWords extends ReduceFunction implements Serializable { - - private static final long serialVersionUID = 1L; - - @Override - public void reduce(Iterator<Record> records, Collector<Record> out) throws Exception { - Record element = null; - int sum = 0; - - while (records.hasNext()) { - element = records.next(); - int cnt = element.getField(1, IntValue.class).getValue(); - sum += cnt; - } - - element.setField(1, new IntValue(sum)); - out.collect(element); - } - - @Override - public void combine(Iterator<Record> records, Collector<Record> out) throws Exception { - // the logic is the same as in the reduce function, so simply call the reduce method - reduce(records, out); - } - } - - - @Override - public Plan getPlan(String... args) { - // parse job parameters - int numSubTasks = (args.length > 0 ? Integer.parseInt(args[0]) : 1); - String dataInput = (args.length > 1 ? args[1] : ""); - String output = (args.length > 2 ? args[2] : ""); - - HadoopDataSource<LongWritable, Text> source = new HadoopDataSource<LongWritable, Text>( - new TextInputFormat(), new JobConf(), "Input Lines"); - TextInputFormat.addInputPath(source.getJobConf(), new Path(dataInput)); - - - MapOperator mapper = MapOperator.builder(new TokenizeLine()) - .input(source) - .name("Tokenize Lines") - .build(); - ReduceOperator reducer = ReduceOperator.builder(CountWords.class, StringValue.class, 0) - .input(mapper) - .name("Count Words") - .build(); - HadoopDataSink<Text, IntWritable> out = new HadoopDataSink<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(),new JobConf(), "Hadoop TextOutputFormat", reducer, Text.class, IntWritable.class); - TextOutputFormat.setOutputPath(out.getJobConf(), new Path(output)); - - Plan plan = new Plan(out, "Hadoop OutputFormat Example"); - plan.setDefaultParallelism(numSubTasks); - return plan; - } - - - @Override - public String getDescription() { - return "Parameters: [numSubStasks] [input] [output]"; - } - - - public static void main(String[] args) throws Exception { - WordCountWithOutputFormat wc = new WordCountWithOutputFormat(); - - if (args.length < 3) { - System.err.println(wc.getDescription()); - System.exit(1); - } - - Plan plan = wc.getPlan(args); - - // This will execute the word-count embedded in a local context. replace this line by the commented - // succeeding line to send the job to a local installation or to a cluster for execution - LocalExecutor.execute(plan); -// PlanExecutor ex = new RemoteExecutor("localhost", 6123, "target/pact-examples-0.4-SNAPSHOT-WordCount.jar"); -// ex.executePlan(plan); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2ff04baa/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java b/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java deleted file mode 100644 index fe7ea8e..0000000 --- a/flink-staging/flink-hadoop-compatibility/src/test/java/org/apache/flink/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.test.hadoopcompatibility.mapred.record; - -import org.apache.flink.api.common.Plan; -import org.apache.flink.hadoopcompatibility.mapred.record.example.WordCountWithOutputFormat; -import org.apache.flink.test.testdata.WordCountData; -import org.apache.flink.test.util.RecordAPITestBase; - -/** - * test the hadoop inputformat and outputformat - */ -public class HadoopRecordInputOutputITCase extends RecordAPITestBase { - protected String textPath; - protected String resultPath; - protected String counts; - - @Override - protected void preSubmit() throws Exception { - textPath = createTempFile("text.txt", WordCountData.TEXT); - resultPath = getTempDirPath("result"); - counts = WordCountData.COUNTS.replaceAll(" ", "\t"); - } - - @Override - protected Plan getTestJob() { - //WordCountWithHadoopOutputFormat takes hadoop TextInputFormat as input and output file in hadoop TextOutputFormat - WordCountWithOutputFormat wc = new WordCountWithOutputFormat(); - return wc.getPlan("1", textPath, resultPath); - } - - @Override - protected void postSubmit() throws Exception { - // Test results, append /1 to resultPath due to the generated _temproray file. - compareResultsByLinesInMemory(counts, resultPath + "/1"); - } -}