[FLINK-2906] Remove Record API This closes #1403
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c787a037 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c787a037 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c787a037 Branch: refs/heads/master Commit: c787a037de5bb456844f9704389bff65b4972e18 Parents: 8fddbf0 Author: zentol <ches...@apache.org> Authored: Tue Nov 24 19:19:42 2015 +0100 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Nov 25 23:45:24 2015 +0100 ---------------------------------------------------------------------- docs/apis/programming_guide.md | 4 +- docs/internals/fig/projects_dependencies.svg | 6 - .../common/functions/GenericCollectorMap.java | 33 -- .../base/CollectorMapOperatorBase.java | 60 --- .../java/org/apache/flink/types/Record.java | 2 +- .../java/record/functions/CoGroupFunction.java | 57 --- .../java/record/functions/CrossFunction.java | 51 -- .../record/functions/FunctionAnnotation.java | 482 ------------------- .../api/java/record/functions/JoinFunction.java | 44 -- .../api/java/record/functions/MapFunction.java | 53 -- .../record/functions/MapPartitionFunction.java | 54 --- .../java/record/functions/ReduceFunction.java | 83 ---- .../java/record/io/CollectionInputFormat.java | 97 ---- .../api/java/record/io/CsvInputFormat.java | 414 ---------------- .../api/java/record/io/CsvOutputFormat.java | 479 ------------------ .../java/record/io/DelimitedInputFormat.java | 49 -- .../java/record/io/DelimitedOutputFormat.java | 319 ------------ .../ExternalProcessFixedLengthInputFormat.java | 227 --------- .../record/io/ExternalProcessInputFormat.java | 143 ------ .../record/io/ExternalProcessInputSplit.java | 58 --- .../api/java/record/io/FileInputFormat.java | 31 -- .../api/java/record/io/FileOutputFormat.java | 32 -- .../java/record/io/FixedLengthInputFormat.java | 245 ---------- .../api/java/record/io/GenericInputFormat.java | 29 -- .../api/java/record/io/TextInputFormat.java | 142 ------ .../java/record/operators/BulkIteration.java | 49 -- .../java/record/operators/CoGroupOperator.java | 399 --------------- .../record/operators/CollectionDataSource.java | 225 --------- .../java/record/operators/CrossOperator.java | 241 ---------- .../operators/CrossWithLargeOperator.java | 98 ---- .../operators/CrossWithSmallOperator.java | 98 ---- .../java/record/operators/DeltaIteration.java | 99 ---- .../api/java/record/operators/FileDataSink.java | 236 --------- .../java/record/operators/FileDataSource.java | 78 --- .../java/record/operators/GenericDataSink.java | 250 ---------- .../record/operators/GenericDataSource.java | 77 --- .../api/java/record/operators/JoinOperator.java | 326 ------------- .../api/java/record/operators/MapOperator.java | 201 -------- .../record/operators/MapPartitionOperator.java | 200 -------- .../record/operators/OperatorInfoHelper.java | 49 -- .../java/record/operators/ReduceOperator.java | 407 ---------------- .../record/CoGroupWrappingFunctionTest.java | 206 -------- .../java/record/ReduceWrappingFunctionTest.java | 233 --------- .../api/java/record/io/CsvInputFormatTest.java | 406 ---------------- .../api/java/record/io/CsvOutputFormatTest.java | 465 ------------------ ...ternalProcessFixedLengthInputFormatTest.java | 298 ------------ .../io/ExternalProcessInputFormatTest.java | 283 ----------- .../record/io/FixedLenghtInputFormatTest.java | 212 -------- .../api/java/record/io/TextInputFormatTest.java | 158 ------ .../flink/optimizer/costs/CostEstimator.java | 3 +- .../flink/optimizer/dag/CollectorMapNode.java | 62 --- .../operators/CollectorMapDescriptor.java | 75 --- .../plandump/PlanJSONDumpGenerator.java | 1 - .../optimizer/plantranslate/JsonMapper.java | 1 - .../traversals/GraphCreatingVisitor.java | 4 - .../runtime/operators/CollectorMapDriver.java | 118 ----- .../flink/runtime/operators/DriverStrategy.java | 3 - .../chaining/ChainedCollectorMapDriver.java | 87 ---- .../flink/test/util/RecordAPITestBase.java | 146 ------ 59 files changed, 3 insertions(+), 8985 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/docs/apis/programming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md index fe3edaa..71a5e26 100644 --- a/docs/apis/programming_guide.md +++ b/docs/apis/programming_guide.md @@ -3079,9 +3079,7 @@ attribute. Both the command line and the web interface support a parameter to pa class name manually for cases where the JAR manifest contains neither attribute. 2. If the entry point class implements the `org.apache.flinkapi.common.Program`, then the system -calls the `getPlan(String...)` method to obtain the program plan to execute. The -`getPlan(String...)` method was the only possible way of defining a program in the *Record API* -(see [0.4 docs](http://stratosphere.eu/docs/0.4/)) and is also supported in the new Java API. +calls the `getPlan(String...)` method to obtain the program plan to execute. 3. If the entry point class does not implement the `org.apache.flinkapi.common.Program` interface, the system will invoke the main method of the class. http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/docs/internals/fig/projects_dependencies.svg ---------------------------------------------------------------------- diff --git a/docs/internals/fig/projects_dependencies.svg b/docs/internals/fig/projects_dependencies.svg index d537ab8..76f6276 100644 --- a/docs/internals/fig/projects_dependencies.svg +++ b/docs/internals/fig/projects_dependencies.svg @@ -176,12 +176,6 @@ under the License. y="145.73346" style="font-size:15.003737px;fill:#000000;font-style:italic;font-weight:normal;text-align:start;text-anchor:start;font-family:Verdana;" id="text3029">Java API</text> - <text - xml:space="preserve" - x="127.97233" - y="163.73794" - style="font-size:15.003737px;fill:#000000;font-style:italic;font-weight:normal;text-align:start;text-anchor:start;font-family:Verdana;" - id="text3031">Old Record API</text> <path style="fill:#ffffff;fill-rule:evenodd;fill-opacity:1;stroke:none;" d=" M 260.48364 198.25564 C 260.48364 195.0861 263.05303 192.51671 266.20382 192.51671 L 399.26822 192.51671 C 402.419 192.51671 404.98839 195.0861 404.98839 198.25564 L 404.98839 221.13634 C 404.98839 224.30588 402.419 226.87527 399.26822 226.87527 L 266.20382 226.87527 C 263.05303 226.87527 260.48364 224.30588 260.48364 221.13634 z" http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java deleted file mode 100644 index d335862..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/functions/GenericCollectorMap.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.functions; - -import org.apache.flink.util.Collector; - -/** - * Variant of the flat map that is used for backwards compatibility in the deprecated Record-API- - * - * @param <T> The input data type. - * @param <O> The result data type. - */ -@Deprecated -public interface GenericCollectorMap<T, O> extends RichFunction { - - void map(T record, Collector<O> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java deleted file mode 100644 index b7ff2ce..0000000 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/CollectorMapOperatorBase.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.common.operators.base; - -import java.util.List; - -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.functions.GenericCollectorMap; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.common.operators.SingleInputOperator; -import org.apache.flink.api.common.operators.UnaryOperatorInformation; -import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; -import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; -import org.apache.flink.api.common.operators.util.UserCodeWrapper; - -/** - * The CollectorMap is the old version of the Map operator. It is effectively a "flatMap", where the - * UDF is called "map". - * - * @see GenericCollectorMap - */ -@Deprecated -@SuppressWarnings("deprecation") -public class CollectorMapOperatorBase<IN, OUT, FT extends GenericCollectorMap<IN, OUT>> extends SingleInputOperator<IN, OUT, FT> { - - public CollectorMapOperatorBase(UserCodeWrapper<FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) { - super(udf, operatorInfo, name); - } - - public CollectorMapOperatorBase(FT udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) { - super(new UserCodeObjectWrapper<FT>(udf), operatorInfo, name); - } - - public CollectorMapOperatorBase(Class<? extends FT> udf, UnaryOperatorInformation<IN, OUT> operatorInfo, String name) { - super(new UserCodeClassWrapper<FT>(udf), operatorInfo, name); - } - - // -------------------------------------------------------------------------------------------- - - @Override - protected List<OUT> executeOnCollections(List<IN> inputData, RuntimeContext ctx, ExecutionConfig executionConfig) { - throw new UnsupportedOperationException(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-core/src/main/java/org/apache/flink/types/Record.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java b/flink-core/src/main/java/org/apache/flink/types/Record.java index 8ef972f..7cbbc44 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Record.java +++ b/flink-core/src/main/java/org/apache/flink/types/Record.java @@ -34,7 +34,7 @@ import org.apache.flink.util.InstantiationUtil; /** - * The Record represents a multi-valued data record and forms the base of the "Record API" + * The Record represents a multi-valued data record. * The record is a tuple of arbitrary values. It implements a sparse tuple model, meaning that the record can contain * many fields which are actually null and not represented in the record. It has internally a bitmap marking which fields * are set and which are not. http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java deleted file mode 100644 index 1ddf362..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CoGroupFunction.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.functions; - -import java.util.Iterator; - -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -/** - * - * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * - * The CoGroupFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CoGroupOperator}. - */ - -@Deprecated -public abstract class CoGroupFunction extends AbstractRichFunction { - - private static final long serialVersionUID = 1L; - - /** - * This method must be implemented to provide a user implementation of a - * matcher. It is called for each two key-value pairs that share the same - * key and come from different inputs. - * - * @param records1 The records from the first input which were paired with the key. - * @param records2 The records from the second input which were paired with the key. - * @param out A collector that collects all output pairs. - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the task and lets the fail-over logic - * decide whether to retry the task execution. - */ - public abstract void coGroup(Iterator<Record> records1, Iterator<Record> records2, Collector<Record> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java deleted file mode 100644 index eaf34a0..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/CrossFunction.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.record.functions; - -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.types.Record; - -/** - * - * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * The CrossFunction is the base class for functions that are invoked by a {@link org.apache.flink.api.java.record.operators.CrossOperator}. - */ -@Deprecated -public abstract class CrossFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.CrossFunction<Record, Record, Record> { - - private static final long serialVersionUID = 1L; - - /** - * This method must be implemented to provide a user implementation of a cross. - * It is called for each element of the Cartesian product of both input sets. - - * @param first The record from the second input. - * @param second The record from the second input. - * @return The result of the cross UDF - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the task and lets the fail-over logic - * decide whether to retry the task execution. - */ - @Override - public abstract Record cross(Record first, Record second) throws Exception; - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java deleted file mode 100644 index 71d2a62..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/FunctionAnnotation.java +++ /dev/null @@ -1,482 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.functions; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -import org.apache.flink.api.common.operators.DualInputSemanticProperties; -import org.apache.flink.api.common.operators.SingleInputSemanticProperties; -import org.apache.flink.api.common.operators.util.FieldSet; -import org.apache.flink.api.common.operators.util.UserCodeWrapper; - - -/** - * - * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * - * - * This class defines the semantic assertions that can be added to functions. - * The assertions are realized as java annotations, to be added to the class declaration of - * the class that realized the user function. For example, to declare the <i>ConstantFieldsExcept</i> - * annotation for a map-type function that realizes a simple absolute function, - * use it the following way: - * - * <pre>{@code - * {@literal @}ConstantFieldsExcept(fields={2}) - * public class MyMapper extends MapFunction - * { - * public void map(Record record, Collector out) - * { - * int value = record.getField(2, IntValue.class).getValue(); - record.setField(2, new IntValue(Math.abs(value))); - - out.collect(record); - * } - * } - * }</pre> - * - * Be aware that some annotations should only be used for functions with as single input - * ({@link MapFunction}, {@link ReduceFunction}) and some only for stubs with two inputs - * ({@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction}). - */ -@Deprecated -public class FunctionAnnotation { - - /** - * Specifies the fields of an input record that are unchanged in the output of - * a stub with a single input ( {@link MapFunction}, {@link ReduceFunction}). - * - * A field is considered to be constant if its value is not changed and copied to the same position of - * output record. - * - * <b> - * It is very important to follow a conservative strategy when specifying constant fields. - * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be - * inserted! Otherwise, the correct execution of a program can not be guaranteed. - * So if in doubt, do not add a field to this set. - * </b> - * - * This annotation is mutually exclusive with the {@link ConstantFieldsExcept} annotation. - * - * If this annotation and the {@link ConstantFieldsExcept} annotation is not set, it is - * assumed that <i>no</i> field is constant. - * - */ - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFields { - int[] value(); - } - - /** - * Specifies that all fields of an input record that are unchanged in the output of - * a {@link MapFunction}, or {@link ReduceFunction}). - * - * A field is considered to be constant if its value is not changed and copied to the same position of - * output record. - * - * <b> - * It is very important to follow a conservative strategy when specifying constant fields. - * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be - * inserted! Otherwise, the correct execution of a program can not be guaranteed. - * So if in doubt, do not add a field to this set. - * </b> - * - * This annotation is mutually exclusive with the {@link ConstantFieldsExcept} annotation. - * - * If this annotation and the {@link ConstantFieldsExcept} annotation is not set, it is - * assumed that <i>no</i> field is constant. - */ - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - public @interface AllFieldsConstants {} - - /** - * Specifies the fields of an input record of the first input that are unchanged in - * the output of a stub with two inputs ( {@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction}) - * - * A field is considered to be constant if its value is not changed and copied to the same position of - * output record. - * - * <b> - * It is very important to follow a conservative strategy when specifying constant fields. - * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be - * inserted! Otherwise, the correct execution of a program can not be guaranteed. - * So if in doubt, do not add a field to this set. - * </b> - * - * This annotation is mutually exclusive with the {@link ConstantFieldsFirstExcept} annotation. - * - * If this annotation and the {@link ConstantFieldsFirstExcept} annotation is not set, it is - * assumed that <i>no</i> field is constant. - * - * - */ - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFieldsFirst { - int[] value(); - } - - /** - * Specifies the fields of an input record of the second input that are unchanged in - * the output of a stub with two inputs ( {@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction}) - * - * A field is considered to be constant if its value is not changed and copied to the same position of - * output record. - * - * <b> - * It is very important to follow a conservative strategy when specifying constant fields. - * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be - * inserted! Otherwise, the correct execution of a program can not be guaranteed. - * So if in doubt, do not add a field to this set. - * </b> - * - * This annotation is mutually exclusive with the {@link ConstantFieldsSecondExcept} annotation. - * - * If this annotation and the {@link ConstantFieldsSecondExcept} annotation is not set, it is - * assumed that <i>no</i> field is constant. - * - */ - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFieldsSecond { - int[] value(); - } - - /** - * Specifies the fields of an input record that are changed in the output of - * a stub with a single input ( {@link MapFunction}, {@link ReduceFunction}). All other - * fields are assumed to be constant. - * - * A field is considered to be constant if its value is not changed and copied to the same position of - * output record. - * - * <b> - * It is very important to follow a conservative strategy when specifying constant fields. - * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be - * inserted! Otherwise, the correct execution of a program can not be guaranteed. - * So if in doubt, do not add a field to this set. - * </b> - * - * This annotation is mutually exclusive with the {@link ConstantFields} annotation. - * - * If this annotation and the {@link ConstantFields} annotation is not set, it is - * assumed that <i>no</i> field is constant. - * - */ - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFieldsExcept { - int[] value(); - } - - /** - * Specifies the fields of an input record of the first input that are changed in - * the output of a stub with two inputs ( {@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction}) - * All other fields are assumed to be constant. - * - * A field is considered to be constant if its value is not changed and copied to the same position of - * output record. - * - * <b> - * It is very important to follow a conservative strategy when specifying constant fields. - * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be - * inserted! Otherwise, the correct execution of a program can not be guaranteed. - * So if in doubt, do not add a field to this set. - * </b> - * - * This annotation is mutually exclusive with the {@link ConstantFieldsFirst} annotation. - * - * If this annotation and the {@link ConstantFieldsFirst} annotation is not set, it is - * assumed that <i>no</i> field is constant. - * - */ - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFieldsFirstExcept { - int[] value(); - } - - - /** - * Specifies the fields of an input record of the second input that are changed in - * the output of a stub with two inputs ( {@link CrossFunction}, {@link JoinFunction}, {@link CoGroupFunction}) - * All other fields are assumed to be constant. - * - * A field is considered to be constant if its value is not changed and copied to the same position of - * output record. - * - * <b> - * It is very important to follow a conservative strategy when specifying constant fields. - * Only fields that are always constant (regardless of value, stub call, etc.) to the output may be - * inserted! Otherwise, the correct execution of a program can not be guaranteed. - * So if in doubt, do not add a field to this set. - * </b> - * - * This annotation is mutually exclusive with the {@link ConstantFieldsSecond} annotation. - * - * If this annotation and the {@link ConstantFieldsSecond} annotation is not set, it is - * assumed that <i>no</i> field is constant. - * - */ - @Target(ElementType.TYPE) - @Retention(RetentionPolicy.RUNTIME) - public @interface ConstantFieldsSecondExcept { - int[] value(); - } - - /** - * Private constructor to prevent instantiation. This class is intended only as a container. - */ - private FunctionAnnotation() {} - - // -------------------------------------------------------------------------------------------- - // Function Annotation Handling - // -------------------------------------------------------------------------------------------- - - public static SingleInputSemanticProperties readSingleConstantAnnotations(UserCodeWrapper<?> udf) { - - // get constantSet annotation from stub - AllFieldsConstants allConstants = udf.getUserCodeAnnotation(AllFieldsConstants.class); - ConstantFields constantSet = udf.getUserCodeAnnotation(ConstantFields.class); - ConstantFieldsExcept notConstantSet = udf.getUserCodeAnnotation(ConstantFieldsExcept.class); - - if (notConstantSet != null && (constantSet != null || allConstants != null)) { - throw new RuntimeException("Either ConstantFields or ConstantFieldsExcept can be specified, not both."); - } - - // extract notConstantSet from annotation - if (notConstantSet != null) { - FieldSet nonConstant = new FieldSet(notConstantSet.value()); - return new ImplicitlyForwardingSingleInputSemanticProperties(nonConstant); - } - - // extract notConstantSet from annotation - if (allConstants != null) { - return new SingleInputSemanticProperties.AllFieldsForwardedProperties(); - } - - SingleInputSemanticProperties semanticProperties = new SingleInputSemanticProperties(); - - // extract constantSet from annotation - if (constantSet != null) { - for (int value: constantSet.value()) { - semanticProperties.addForwardedField(value,value); - } - } - - return semanticProperties; - } - - // -------------------------------------------------------------------------------------------- - - public static DualInputSemanticProperties readDualConstantAnnotations(UserCodeWrapper<?> udf) { - ImplicitlyForwardingTwoInputSemanticProperties semanticProperties = new ImplicitlyForwardingTwoInputSemanticProperties(); - - // get readSet annotation from stub - ConstantFieldsFirst constantSet1Annotation = udf.getUserCodeAnnotation(ConstantFieldsFirst.class); - ConstantFieldsSecond constantSet2Annotation = udf.getUserCodeAnnotation(ConstantFieldsSecond.class); - - // get readSet annotation from stub - ConstantFieldsFirstExcept notConstantSet1Annotation = udf.getUserCodeAnnotation(ConstantFieldsFirstExcept.class); - ConstantFieldsSecondExcept notConstantSet2Annotation = udf.getUserCodeAnnotation(ConstantFieldsSecondExcept.class); - - - if (notConstantSet1Annotation != null && constantSet1Annotation != null) { - throw new RuntimeException("Either ConstantFieldsFirst or ConstantFieldsFirstExcept can be specified, not both."); - } - - if (constantSet2Annotation != null && notConstantSet2Annotation != null) { - throw new RuntimeException("Either ConstantFieldsSecond or ConstantFieldsSecondExcept can be specified, not both."); - } - - - // extract readSets from annotations - if(notConstantSet1Annotation != null) { - semanticProperties.setImplicitlyForwardingFirstExcept(new FieldSet(notConstantSet1Annotation.value())); - } - - if(notConstantSet2Annotation != null) { - semanticProperties.setImplicitlyForwardingSecondExcept(new FieldSet(notConstantSet2Annotation.value())); - } - - // extract readSets from annotations - if (constantSet1Annotation != null) { - for(int value: constantSet1Annotation.value()) { - semanticProperties.addForwardedField(0, value, value); - } - } - - if (constantSet2Annotation != null) { - for(int value: constantSet2Annotation.value()) { - semanticProperties.addForwardedField(1, value, value); - } - } - - return semanticProperties; - } - - - private static final class ImplicitlyForwardingSingleInputSemanticProperties extends SingleInputSemanticProperties { - - private static final long serialVersionUID = 1l; - - private FieldSet nonForwardedFields; - - private ImplicitlyForwardingSingleInputSemanticProperties(FieldSet nonForwardedFields) { - this.nonForwardedFields = nonForwardedFields; - } - - @Override - public FieldSet getForwardingTargetFields(int input, int sourceField) { - - if (input != 0) { - throw new IndexOutOfBoundsException(); - } - - if (nonForwardedFields == null) { - return super.getForwardingTargetFields(input, sourceField); - } else { - if (this.nonForwardedFields.contains(sourceField)) { - return FieldSet.EMPTY_SET; - } else { - return new FieldSet(sourceField); - } - } - } - - @Override - public int getForwardingSourceField(int input, int targetField) { - - if (input != 0) { - throw new IndexOutOfBoundsException(); - } - - if (nonForwardedFields == null) { - return super.getForwardingSourceField(input, targetField); - } else { - if (this.nonForwardedFields.contains(targetField)) { - return -1; - } else { - return targetField; - } - } - } - - @Override - public FieldSet getReadFields(int input) { - return null; - } - - @Override - public void addForwardedField(int sourceField, int destinationField) { - if (this.nonForwardedFields == null) { - super.addForwardedField(sourceField, destinationField); - } else { - throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + - "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); - } - } - - } - - private static final class ImplicitlyForwardingTwoInputSemanticProperties extends DualInputSemanticProperties { - private static final long serialVersionUID = 1L; - - private FieldSet nonForwardedFields1; - private FieldSet nonForwardedFields2; - - private ImplicitlyForwardingTwoInputSemanticProperties() {} - - public void setImplicitlyForwardingFirstExcept(FieldSet nonForwardedFields) { - this.nonForwardedFields1 = nonForwardedFields; - } - - public void setImplicitlyForwardingSecondExcept(FieldSet nonForwardedFields) { - this.nonForwardedFields2 = nonForwardedFields; - } - - @Override - public FieldSet getForwardingTargetFields(int input, int sourceField) { - - if(input != 0 && input != 1) { - throw new IndexOutOfBoundsException(); - } else if (input == 0) { - - if (this.nonForwardedFields1 == null) { - return super.getForwardingTargetFields(0, sourceField); - } - else { - if (this.nonForwardedFields1.contains(sourceField)) { - return FieldSet.EMPTY_SET; - } else { - return new FieldSet(sourceField); - } - } - } else { - - if (this.nonForwardedFields2 == null) { - return super.getForwardingTargetFields(1, sourceField); - } - else { - if (this.nonForwardedFields2.contains(sourceField)) { - return FieldSet.EMPTY_SET; - } else { - return new FieldSet(sourceField); - } - } - } - } - - @Override - public void addForwardedField(int input, int sourceField, int destinationField) { - if (input != 0 && input != 1) { - throw new IndexOutOfBoundsException(); - } else if (input == 0) { - if (this.nonForwardedFields1 == null) { - super.addForwardedField(0, sourceField, destinationField); - } else { - throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + - "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); - } - } else { - if (this.nonForwardedFields2 == null) { - super.addForwardedField(1, sourceField, destinationField); - } else { - throw new UnsupportedOperationException("When defining fields as implicitly constant for an input" + - "(such as through the ConstantFieldsFirstExcept annotation), you cannot manually add forwarded fields."); - } - } - } - - @Override - public FieldSet getReadFields(int input) { - return null; - } - - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java deleted file mode 100644 index 3afb271..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/JoinFunction.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.functions; - -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -/** - * - * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * - * The JoinFunction must implementation by functions of a {@link org.apache.flink.api.java.record.operators.JoinOperator}. - * It resembles an equality join of both inputs on their key fields. - */ - -@Deprecated -public abstract class JoinFunction extends AbstractRichFunction implements FlatJoinFunction<Record, Record, Record> { - - private static final long serialVersionUID = 1L; - - @Override - public abstract void join(Record value1, Record value2, Collector<Record> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java deleted file mode 100644 index e51c35f..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapFunction.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.functions; - -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.api.common.functions.GenericCollectorMap; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -/** - * - * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * The MapFunction must be extended to provide a mapper implementation - * By definition, the mapper is called for each individual input record. - */ -@Deprecated -public abstract class MapFunction extends AbstractRichFunction implements GenericCollectorMap<Record, Record> { - - private static final long serialVersionUID = 1L; - - /** - * This method must be implemented to provide a user implementation of a mapper. - * It is called for each individual record. - * - * @param record The record to be mapped. - * @param out A collector that collects all output records. - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the map task and lets the fail-over logic - * decide whether to retry the mapper execution. - */ - @Override - public abstract void map(Record record, Collector<Record> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java deleted file mode 100644 index ac18c95..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/MapPartitionFunction.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.functions; - -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -/** - * - * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * - * The MapPartitionFunction must be extended to provide a map partition implementation - * By definition, the map partition is called for a full input set. - */ - -@Deprecated -public abstract class MapPartitionFunction extends AbstractRichFunction implements org.apache.flink.api.common.functions.MapPartitionFunction<Record, Record> { - - private static final long serialVersionUID = 1L; - - /** - * This method must be implemented to provide a user implementation of a MapPartitionFunction. - * It is called for a full input set. - * - * @param values all input records - * @param out A collector that collects all output records. - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the map task and lets the fail-over logic - * decide whether to retry the mapper execution. - */ - @Override - public abstract void mapPartition(Iterable<Record> values, Collector<Record> out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java b/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java deleted file mode 100644 index 96350aa..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/functions/ReduceFunction.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.functions; - -import java.util.Iterator; - -import org.apache.flink.api.common.functions.AbstractRichFunction; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; - -/** - * - * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * - * The ReduceFunction must be extended to provide a reducer implementation, as invoked by a - * {@link org.apache.flink.api.java.record.operators.ReduceOperator}. - */ -@Deprecated -public abstract class ReduceFunction extends AbstractRichFunction { - - private static final long serialVersionUID = 1L; - - /** - * The central function to be implemented for a reducer. The function receives per call one - * key and all the values that belong to that key. Each key is guaranteed to be processed by exactly - * one function call across all involved instances across all computing nodes. - * - * @param records All records that belong to the given input key. - * @param out The collector to hand results to. - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the reduce task and lets the fail-over logic - * decide whether to retry the reduce execution. - */ - public abstract void reduce(Iterator<Record> records, Collector<Record> out) throws Exception; - - /** - * No default implementation provided. - * This method must be overridden by reduce stubs that want to make use of the combining feature. - * In addition, the ReduceFunction extending class must be annotated as Combinable. - * Note that this function must be implemented, if the reducer is annotated as combinable. - * <p> - * The use of the combiner is typically a pre-reduction of the data. It works similar as the reducer, only that is - * is not guaranteed to see all values with the same key in one call to the combine function. Since it is called - * prior to the <code>reduce()</code> method, input and output types of the combine method are the input types of - * the <code>reduce()</code> method. - * - * @see org.apache.flink.api.java.record.operators.ReduceOperator.Combinable - * @param records - * The records to be combined. Unlike in the reduce method, these are not necessarily all records - * belonging to the given key. - * @param out The collector to write the result to. - * - * @throws Exception Implementations may forward exceptions, which are caught by the runtime. When the - * runtime catches an exception, it aborts the combine task and lets the fail-over logic - * decide whether to retry the combiner execution. - */ - public void combine(Iterator<Record> records, Collector<Record> out) throws Exception { - // to be implemented, if the reducer should use a combiner. Note that the combining method - // is only used, if the stub class is further annotated with the annotation - // @ReduceOperator.Combinable - reduce(records, out); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java deleted file mode 100644 index c94c727..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CollectionInputFormat.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import java.io.IOException; -import java.util.Collection; -import java.util.Iterator; - -import org.apache.flink.api.common.io.GenericInputFormat; -import org.apache.flink.api.common.io.NonParallelInput; -import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.types.Record; -import org.apache.flink.types.ValueUtil; - -/** - * input format for java collection input. It can accept collection data or serializable iterator - */ -public class CollectionInputFormat extends GenericInputFormat<Record> implements NonParallelInput { - - private static final long serialVersionUID = 1L; - - private Collection<?> dataSet; // input data as collection - - private Iterator<?> serializableIter; // input data as serializable iterator - - private transient Iterator<?> it; - - @Override - public boolean reachedEnd() throws IOException { - return !it.hasNext(); - } - - @Override - public void open(GenericInputSplit split) throws IOException { - super.open(split); - if (serializableIter != null) { - it = serializableIter; - } - else { - it = this.dataSet.iterator(); - } - } - - @Override - public Record nextRecord(Record record) throws IOException { - if (it.hasNext()) { - record.clear(); - Object b = it.next(); - // check whether the record field is one-dimensional or multi-dimensional - if (b.getClass().isArray()) { - for (Object s : (Object[]) b) { - record.addField(ValueUtil.toFlinkValueType(s)); - } - } - else if (b instanceof Collection) { - @SuppressWarnings("unchecked") - Iterator<Object> tmpIter = ((Collection<Object>) b).iterator(); - while (tmpIter.hasNext()) { - Object s = tmpIter.next(); - record.addField(ValueUtil.toFlinkValueType(s)); - } - } - else { - record.setField(0, ValueUtil.toFlinkValueType(b)); - } - return record; - } else { - return null; - } - } - - public void setData(Collection<?> data) { - this.dataSet = data; - this.serializableIter = null; - } - - public <T extends Iterator<?>, Serializable> void setIter(T iter) { - this.serializableIter = iter; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java deleted file mode 100644 index 4e92874..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvInputFormat.java +++ /dev/null @@ -1,414 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import com.google.common.base.Preconditions; - -import org.apache.flink.api.common.io.GenericCsvInputFormat; -import org.apache.flink.api.common.io.ParseException; -import org.apache.flink.api.common.operators.CompilerHints; -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.java.record.operators.FileDataSource; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.types.DoubleValue; -import org.apache.flink.types.IntValue; -import org.apache.flink.types.LongValue; -import org.apache.flink.types.Record; -import org.apache.flink.types.Value; -import org.apache.flink.types.parser.FieldParser; - -import java.io.IOException; - -/** - * Input format to parse text files and generate Records. - * The input file is structured by record delimiters and field delimiters (CSV files are common). - * Record delimiter separate records from each other ('\n' is common). - * Field delimiters separate fields within a record. - * Record and field delimiters must be configured using the InputFormat {@link Configuration}. - * - * The number of fields to parse must be configured as well. - * For each field a data type must be specified using the {@link CsvInputFormat#FIELD_TYPE_PARAMETER_PREFIX} config key. - * - * The position within the text record can be configured for each field using the {@link CsvInputFormat#TEXT_POSITION_PARAMETER_PREFIX} config key. - * Either all text positions must be configured or none. If none is configured, the index of the config key is used. - * The position of a value within the {@link Record} is the index of the config key. - * - * @see Configuration - * @see Record - */ -@SuppressWarnings("deprecation") -public class CsvInputFormat extends GenericCsvInputFormat<Record> { - - private static final long serialVersionUID = 1L; - - private transient Value[] parsedValues; - - private int[] targetPositions = new int[0]; - - private boolean configured = false; - - //To speed up readRecord processing. Used to find windows line endings. - //It is set when open so that readRecord does not have to evaluate it - private boolean lineDelimiterIsLinebreak = false; - - // -------------------------------------------------------------------------------------------- - // Constructors and getters/setters for the configurable parameters - // -------------------------------------------------------------------------------------------- - - public CsvInputFormat() { - super(); - } - - public CsvInputFormat(char fieldDelimiter) { - super(); - setFieldDelimiter(fieldDelimiter); - } - - public CsvInputFormat(Class<? extends Value> ... fields) { - super(); - setFieldTypes(fields); - } - - public CsvInputFormat(char fieldDelimiter, Class<? extends Value> ... fields) { - super(); - setFieldDelimiter(fieldDelimiter); - setFieldTypes(fields); - } - - // -------------------------------------------------------------------------------------------- - - public void setFieldTypesArray(Class<? extends Value>[] fieldTypes) { - setFieldTypes(fieldTypes); - } - - public void setFieldTypes(Class<? extends Value> ... fieldTypes) { - if (fieldTypes == null) { - throw new IllegalArgumentException("Field types must not be null."); - } - - // sanity check - for (Class<? extends Value> type : fieldTypes) { - if (type != null && !Value.class.isAssignableFrom(type)) { - throw new IllegalArgumentException("The types must be subclasses if " + Value.class.getName()); - } - } - - setFieldTypesGeneric(fieldTypes); - } - - public void setFields(int[] sourceFieldIndices, Class<? extends Value>[] fieldTypes) { - Preconditions.checkNotNull(fieldTypes); - - // sanity check - for (Class<? extends Value> type : fieldTypes) { - if (!Value.class.isAssignableFrom(type)) { - throw new IllegalArgumentException("The types must be subclasses if " + Value.class.getName()); - } - } - - setFieldsGeneric(sourceFieldIndices, fieldTypes); - } - - // -------------------------------------------------------------------------------------------- - // Pre-flight: Configuration - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(Configuration config) { - super.configure(config); - - if (configured) { - return; - } - - final String fieldDelimStr = config.getString(FIELD_DELIMITER_PARAMETER, null); - if (fieldDelimStr != null) { - setFieldDelimiter(fieldDelimStr); - } - - // read number of field configured via configuration - int numConfigFields = config.getInteger(NUM_FIELDS_PARAMETER, -1); - if (numConfigFields != -1) { - if (numConfigFields <= 0) { - throw new IllegalConfigurationException("The number of fields for the CsvInputFormat is invalid."); - } - - if (getNumberOfNonNullFields() > 0) { - throw new IllegalConfigurationException("Mixing configuration via instance parameters and config parameters is not possible."); - } - - int[] textPosIdx = new int[numConfigFields]; - boolean anyTextPosSet = false; - boolean allTextPosSet = true; - int maxTextPos = -1; - - // parse text positions - for (int i = 0; i < numConfigFields; i++) { - int pos = config.getInteger(TEXT_POSITION_PARAMETER_PREFIX + i, -1); - if (pos == -1) { - allTextPosSet = false; - textPosIdx[i] = i; - maxTextPos = i; - } else { - anyTextPosSet = true; - textPosIdx[i] = pos; - maxTextPos = pos > maxTextPos ? pos : maxTextPos; - } - } - // check if either none or all text positions have been set - if (anyTextPosSet && !allTextPosSet) { - throw new IllegalArgumentException("Invalid configuration for CsvInputFormat: " + - "Not all text positions set"); - } - - // init the array of types to be set. unify the types from the config - // with the types array set on the instance - - // make sure we have a sufficiently large types array - @SuppressWarnings("unchecked") - Class<? extends Value>[] types = (Class<? extends Value>[]) new Class[maxTextPos+1]; - int[] targetPos = new int[maxTextPos+1]; - - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - - // set the fields - try { - for (int i = 0; i < numConfigFields; i++) { - int pos = textPosIdx[i]; - - Class<? extends Value> clazz = config.getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, cl).asSubclass(Value.class); - if (clazz == null) { - throw new IllegalConfigurationException("Invalid configuration for CsvInputFormat: " + - "No field parser class for parameter " + i); - } - - types[pos] = clazz; - targetPos[pos] = i; - } - } - catch (ClassNotFoundException e) { - throw new RuntimeException("Could not resolve type classes", e); - } - - // update the field types - setFieldTypes(types); - - // make a dense target pos array - this.targetPositions = new int[numConfigFields]; - for (int i = 0, k = 0; i < targetPos.length; i++) { - if (types[i] != null) { - this.targetPositions[k++] = targetPos[i]; - } - } - } - else { - // not configured via config parameters - if (this.targetPositions.length == 0) { - this.targetPositions = new int[getNumberOfNonNullFields()]; - for (int i = 0; i < this.targetPositions.length; i++) { - this.targetPositions[i] = i; - } - } - } - - if (getNumberOfNonNullFields() == 0) { - throw new IllegalConfigurationException("No fields configured in the CsvInputFormat."); - } - - this.configured = true; - } - - - @Override - public void open(FileInputSplit split) throws IOException { - super.open(split); - - @SuppressWarnings("unchecked") - FieldParser<Value>[] fieldParsers = (FieldParser<Value>[]) getFieldParsers(); - - // create the value holders - this.parsedValues = new Value[fieldParsers.length]; - for (int i = 0; i < fieldParsers.length; i++) { - this.parsedValues[i] = fieldParsers[i].createValue(); - } - - //left to right evaluation makes access [0] okay - //this marker is used to fasten up readRecord, so that it doesn't have to check each call if the line ending is set to default - if(this.getDelimiter().length == 1 && this.getDelimiter()[0] == '\n' ) { - this.lineDelimiterIsLinebreak = true; - } - } - - @Override - public Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes) throws ParseException { - /* - * Fix to support windows line endings in CSVInputFiles with standard delimiter setup = \n - */ - //Find windows end line, so find carriage return before the newline - if(this.lineDelimiterIsLinebreak && bytes[offset + numBytes -1] == '\r') { - //reduce the number of bytes so that the Carriage return is not taken as data - numBytes--; - } - - if (parseRecord(parsedValues, bytes, offset, numBytes)) { - // valid parse, map values into pact record - for (int i = 0; i < parsedValues.length; i++) { - reuse.setField(targetPositions[i], parsedValues[i]); - } - return reuse; - } else { - return null; - } - } - - // ============================================================================================ - // Parameterization via configuration - // ============================================================================================ - - // ------------------------------------- Config Keys ------------------------------------------ - - private static final String FIELD_DELIMITER_PARAMETER = "recordinformat.delimiter.field"; - - private static final String NUM_FIELDS_PARAMETER = "recordinformat.field.number"; - - private static final String FIELD_TYPE_PARAMETER_PREFIX = "recordinformat.field.type_"; - - private static final String TEXT_POSITION_PARAMETER_PREFIX = "recordinformat.text.position_"; - - /** - * Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent - * fashion. - * - * @return A config builder for setting parameters. - */ - public static ConfigBuilder configureRecordFormat(FileDataSource target) { - return new ConfigBuilder(target, target.getParameters()); - } - - /** - * An abstract builder used to set parameters to the input format's configuration in a fluent way. - */ - protected static class AbstractConfigBuilder<T> extends DelimitedInputFormat.AbstractConfigBuilder<T> { - - protected final RecordFormatCompilerHints hints; - - /** - * Creates a new builder for the given configuration. - * - * @param contract The contract from which the compiler hints are used. - * If contract is null, new compiler hints are generated. - * @param config The configuration into which the parameters will be written. - */ - protected AbstractConfigBuilder(Operator<?> contract, Configuration config) { - super(config); - - if (contract != null) { - this.hints = new RecordFormatCompilerHints(contract.getCompilerHints()); - - // initialize with 2 bytes length for the header (its actually 3, but one is skipped on the first field - this.hints.addWidthRecordFormat(2); - } - else { - this.hints = new RecordFormatCompilerHints(new CompilerHints()); - } - } - - // -------------------------------------------------------------------- - - /** - * Sets the delimiter that delimits the individual fields in the records textual input representation. - * - * @param delimiter The character to be used as a field delimiter. - * @return The builder itself. - */ - public T fieldDelimiter(char delimiter) { - this.config.setString(FIELD_DELIMITER_PARAMETER, String.valueOf(delimiter)); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - public T field(Class<? extends Value> type, int textPosition) { - return field(type, textPosition, Float.NEGATIVE_INFINITY); - - } - - public T field(Class<? extends Value> type, int textPosition, float avgLen) { - // register field - final int numYet = this.config.getInteger(NUM_FIELDS_PARAMETER, 0); - this.config.setClass(FIELD_TYPE_PARAMETER_PREFIX + numYet, type); - this.config.setInteger(TEXT_POSITION_PARAMETER_PREFIX + numYet, textPosition); - this.config.setInteger(NUM_FIELDS_PARAMETER, numYet + 1); - - // register length - if (avgLen == Float.NEGATIVE_INFINITY) { - if (type == IntValue.class) { - avgLen = 4f; - } else if (type == DoubleValue.class || type == LongValue.class) { - avgLen = 8f; - } - } - - if (avgLen != Float.NEGATIVE_INFINITY) { - // add the len, plus one byte for the offset coding - this.hints.addWidthRecordFormat(avgLen + 1); - } - - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - } - - /** - * A builder used to set parameters to the input format's configuration in a fluent way. - */ - public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> { - - protected ConfigBuilder(Operator<?> target, Configuration targetConfig) { - super(target, targetConfig); - } - } - - private static final class RecordFormatCompilerHints extends CompilerHints { - - private float width = 0.0f; - - private RecordFormatCompilerHints(CompilerHints parent) { - copyFrom(parent); - } - - @Override - public float getAvgOutputRecordSize() { - float superWidth = super.getAvgOutputRecordSize(); - if (superWidth > 0.0f || this.width <= 0.0f) { - return superWidth; - } else { - return this.width; - } - } - - private void addWidthRecordFormat(float width) { - this.width += width; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java deleted file mode 100644 index a5d83c3..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/CsvOutputFormat.java +++ /dev/null @@ -1,479 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.io; - -import java.io.BufferedOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Writer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.Record; -import org.apache.flink.types.Value; - -/** - * This is an OutputFormat to serialize {@link Record}s to text. The output is - * structured by record delimiters and field delimiters as common in CSV files. - * Record delimiter separate records from each other ('\n' is common). Field - * delimiters separate fields within a record. Record and field delimiters can - * be configured using the CsvOutputFormat {@link Configuration}. - * - * The number of fields to serialize must be configured as well. For each field - * the type of the {@link Value} must be specified using the - * {@link CsvOutputFormat#FIELD_TYPE_PARAMETER_PREFIX} config key and an index - * running from 0 to the number of fields. - * - * The position within the {@link Record} can be configured for each field using - * the {@link CsvOutputFormat#RECORD_POSITION_PARAMETER_PREFIX} config key. - * Either all {@link Record} positions must be configured or none. If none is - * configured, the index of the config key is used. - * - * @see Value - * @see Configuration - * @see Record - */ -@SuppressWarnings("deprecation") -public class CsvOutputFormat extends FileOutputFormat { - private static final long serialVersionUID = 1L; - - public static final String RECORD_DELIMITER_PARAMETER = "output.record.delimiter"; - - private static final String RECORD_DELIMITER_ENCODING = "output.record.delimiter-encoding"; - - public static final String FIELD_DELIMITER_PARAMETER = "output.record.field-delimiter"; - - public static final String NUM_FIELDS_PARAMETER = "output.record.num-fields"; - - public static final String FIELD_TYPE_PARAMETER_PREFIX = "output.record.type_"; - - public static final String RECORD_POSITION_PARAMETER_PREFIX = "output.record.position_"; - - public static final String LENIENT_PARSING = "output.record.lenient"; - - @SuppressWarnings("unused") - private static final Logger LOG = LoggerFactory.getLogger(CsvOutputFormat.class); - - // -------------------------------------------------------------------------------------------- - - private int numFields; - - private Class<? extends Value>[] classes; - - private int[] recordPositions; - - private Writer wrt; - - private String fieldDelimiter; - - private String recordDelimiter; - - private String charsetName; - - private boolean lenient; - - private boolean ctorInstantiation = false; - - // -------------------------------------------------------------------------------------------- - // Constructors and getters/setters for the configurable parameters - // -------------------------------------------------------------------------------------------- - - public CsvOutputFormat() { - } - - /** - * Creates an instance of CsvOutputFormat. The position of the fields in the - * record is determined by the order in which the classes are given to this - * constructor. As the default value for separating records '\n' is used. - * The default field delimiter is ','. - * - * @param types - * The types of the fields that are in the record. - */ - public CsvOutputFormat(Class<? extends Value>... types) { - this("\n", ",", types); - } - - /** - * Creates an instance of CsvOutputFormat. The position of the fields in the - * record is determined by the order in which the classes are given to this - * constructor. As the default value for separating records '\n' is used. - * - * @param fieldDelimiter - * The delimiter that is used to separate the different fields in - * the record. - * @param types - * The types of the fields that are in the record. - */ - public CsvOutputFormat(String fieldDelimiter, Class<? extends Value>... types) { - this("\n", fieldDelimiter, types); - } - - /** - * Creates an instance of CsvOutputFormat. The position of the fields in the - * record is determined by the order in which the classes are given to this - * constructor. - * - * @param recordDelimiter - * The delimiter that is used to separate the different records. - * @param fieldDelimiter - * The delimiter that is used to separate the different fields in - * the record. - * @param types - * The types of the fields that are in the record. - */ - public CsvOutputFormat(String recordDelimiter, String fieldDelimiter, Class<? extends Value>... types) { - if (recordDelimiter == null) { - throw new IllegalArgumentException("RecordDelmiter shall not be null."); - } - if (fieldDelimiter == null) { - throw new IllegalArgumentException("FieldDelimiter shall not be null."); - } - if (types.length == 0) { - throw new IllegalArgumentException("No field types given."); - } - - this.fieldDelimiter = fieldDelimiter; - this.recordDelimiter = recordDelimiter; - this.lenient = false; - - setTypes(types); - ctorInstantiation = true; - } - - public void setTypes(Class<? extends Value>... types) { - this.classes = types; - this.numFields = types.length; - this.recordPositions = new int[types.length]; - for (int i = 0; i < types.length; i++) { - if (types[i] == null) { - throw new IllegalArgumentException("Invalid Constructor Parameter: No type class for parameter " + (2 * i)); - } - this.recordPositions[i] = i; - } - - if (this.fieldDelimiter == null) { - this.fieldDelimiter = ","; - } - - if (this.recordDelimiter == null) { - this.recordDelimiter = "\n"; - } - } - - public void setLenient(boolean lenient) { - this.lenient = lenient; - } - - @Override - public void configure(Configuration parameters) { - super.configure(parameters); - - int configNumFields = parameters.getInteger(NUM_FIELDS_PARAMETER, -1); - - if (ctorInstantiation) { - if (configNumFields > 0) { - throw new IllegalStateException("CsvOutputFormat instantiated via both parameters and config."); - } - return; //already configured, no further actions required - } - - if (configNumFields < 1) { - throw new IllegalStateException("CsvOutputFormat not configured via parameters or config."); - } - - this.numFields = configNumFields; - - @SuppressWarnings("unchecked") - Class<Value>[] arr = new Class[this.numFields]; - this.classes = arr; - - try { - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - - for (int i = 0; i < this.numFields; i++) { - Class<? extends Value> clazz = parameters.<Value>getClass(FIELD_TYPE_PARAMETER_PREFIX + i, null, cl); - if (clazz == null) { - throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " + "No type class for parameter " + i); - } - - this.classes[i] = clazz; - } - } - catch (ClassNotFoundException e) { - throw new RuntimeException("Could not resolve type classes", e); - } - - this.recordPositions = new int[this.numFields]; - boolean anyRecordPosDefined = false; - boolean allRecordPosDefined = true; - - for (int i = 0; i < this.numFields; i++) { - int pos = parameters.getInteger(RECORD_POSITION_PARAMETER_PREFIX + i, Integer.MIN_VALUE); - if (pos != Integer.MIN_VALUE) { - anyRecordPosDefined = true; - if (pos < 0) { - throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " - + "Invalid record position for parameter " + i); - } - this.recordPositions[i] = pos; - } else { - allRecordPosDefined = false; - this.recordPositions[i] = i; - } - } - - if (anyRecordPosDefined && !allRecordPosDefined) { - throw new IllegalArgumentException("Invalid configuration for CsvOutputFormat: " - + "Either none or all record positions must be defined."); - } - - this.recordDelimiter = parameters.getString(RECORD_DELIMITER_PARAMETER, AbstractConfigBuilder.NEWLINE_DELIMITER); - if (this.recordDelimiter == null) { - throw new IllegalArgumentException("The delimiter in the DelimitedOutputFormat must not be null."); - } - this.charsetName = parameters.getString(RECORD_DELIMITER_ENCODING, null); - this.fieldDelimiter = parameters.getString(FIELD_DELIMITER_PARAMETER, ","); - this.lenient = parameters.getBoolean(LENIENT_PARSING, false); - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException - { - super.open(taskNumber, numTasks); - this.wrt = this.charsetName == null ? new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096)) : - new OutputStreamWriter(new BufferedOutputStream(this.stream, 4096), this.charsetName); - } - - @Override - public void close() throws IOException { - if (wrt != null) { - this.wrt.close(); - } - super.close(); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public void writeRecord(Record record) throws IOException { - int numRecFields = record.getNumFields(); - int readPos; - - for (int i = 0; i < this.numFields; i++) { - readPos = this.recordPositions[i]; - if (readPos < numRecFields) { - Value v = record.getField(this.recordPositions[i], this.classes[i]); - if (v != null) { - if (i != 0) { - this.wrt.write(this.fieldDelimiter); - } - this.wrt.write(v.toString()); - } else { - if (this.lenient) { - if (i != 0) { - this.wrt.write(this.fieldDelimiter); - } - } else { - throw new RuntimeException("Cannot serialize record with <null> value at position: " + readPos); - } - } - - } else { - if (this.lenient) { - if (i != 0) { - this.wrt.write(this.fieldDelimiter); - } - } else { - throw new RuntimeException("Cannot serialize record with out field at position: " + readPos); - } - } - - } - - // add the record delimiter - this.wrt.write(this.recordDelimiter); - } - - // ============================================================================================ - - /** - * Creates a configuration builder that can be used to set the input - * format's parameters to the config in a fluent fashion. - * - * @return A config builder for setting parameters. - */ - public static ConfigBuilder configureRecordFormat(FileDataSink target) { - return new ConfigBuilder(target.getParameters()); - } - - /** - * Abstract builder used to set parameters to the input format's - * configuration in a fluent way. - */ - protected static abstract class AbstractConfigBuilder<T> extends FileOutputFormat.AbstractConfigBuilder<T> { - private static final String NEWLINE_DELIMITER = "\n"; - - // -------------------------------------------------------------------- - - /** - * Creates a new builder for the given configuration. - * - * @param config - * The configuration into which the parameters will be - * written. - */ - protected AbstractConfigBuilder(Configuration config) { - super(config); - } - - // -------------------------------------------------------------------- - - /** - * Sets the delimiter to be a single character, namely the given one. - * The character must be within the value range <code>0</code> to - * <code>127</code>. - * - * @param delimiter - * The delimiter character. - * @return The builder itself. - */ - public T recordDelimiter(char delimiter) { - if (delimiter == '\n') { - this.config.setString(RECORD_DELIMITER_PARAMETER, NEWLINE_DELIMITER); - } else { - this.config.setString(RECORD_DELIMITER_PARAMETER, String.valueOf(delimiter)); - } - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the delimiter to be the given string. The string will be - * converted to bytes for more efficient comparison during input - * parsing. The conversion will be done using the platforms default - * charset. - * - * @param delimiter - * The delimiter string. - * @return The builder itself. - */ - public T recordDelimiter(String delimiter) { - this.config.setString(RECORD_DELIMITER_PARAMETER, delimiter); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the delimiter to be the given string. The string will be - * converted to bytes for more efficient comparison during input - * parsing. The conversion will be done using the charset with the given - * name. The charset must be available on the processing nodes, - * otherwise an exception will be raised at runtime. - * - * @param delimiter - * The delimiter string. - * @param charsetName - * The name of the encoding character set. - * @return The builder itself. - */ - public T recordDelimiter(String delimiter, String charsetName) { - this.config.setString(RECORD_DELIMITER_PARAMETER, delimiter); - this.config.setString(RECORD_DELIMITER_ENCODING, charsetName); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the delimiter that delimits the individual fields in the records - * textual output representation. - * - * @param delimiter - * The character to be used as a field delimiter. - * @return The builder itself. - */ - public T fieldDelimiter(char delimiter) { - this.config.setString(FIELD_DELIMITER_PARAMETER, String.valueOf(delimiter)); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Adds a field of the record to be serialized to the output. The field - * at the given position will be interpreted as the type represented by - * the given class. The types {@link Object#toString()} method will be - * invoked to create a textual representation. - * - * @param type - * The type of the field. - * @param recordPosition - * The position in the record. - * @return The builder itself. - */ - public T field(Class<? extends Value> type, int recordPosition) { - final int numYet = this.config.getInteger(NUM_FIELDS_PARAMETER, 0); - this.config.setClass(FIELD_TYPE_PARAMETER_PREFIX + numYet, type); - this.config.setInteger(RECORD_POSITION_PARAMETER_PREFIX + numYet, recordPosition); - this.config.setInteger(NUM_FIELDS_PARAMETER, numYet + 1); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the leniency for the serializer. A lenient serializer simply - * skips missing fields and null fields in the record, while a non - * lenient one throws an exception. - * - * @param lenient - * True, if the serializer should be lenient, false - * otherwise. - * @return The builder itself. - */ - public T lenient(boolean lenient) { - this.config.setBoolean(LENIENT_PARSING, lenient); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - } - - /** - * A builder used to set parameters to the input format's configuration in a - * fluent way. - */ - public static final class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> { - /** - * Creates a new builder for the given configuration. - * - * @param targetConfig - * The configuration into which the parameters will be - * written. - */ - protected ConfigBuilder(Configuration targetConfig) { - super(targetConfig); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java deleted file mode 100644 index e74ee49..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedInputFormat.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import org.apache.flink.types.Record; - -/** - * Base implementation for input formats that split the input at a delimiter into records. - * The parsing of the record bytes into the record has to be implemented in the - * {@link #readRecord(Record, byte[], int, int)} method. - * <p> - * The default delimiter is the newline character {@code '\n'}. - */ -public abstract class DelimitedInputFormat extends org.apache.flink.api.common.io.DelimitedInputFormat<Record> { - - private static final long serialVersionUID = -2297199268758915692L; - - // -------------------------------------------------------------------------------------------- - // User-defined behavior - // -------------------------------------------------------------------------------------------- - - /** - * This function parses the given byte array which represents a serialized key/value - * pair. The parsed content is then returned by setting the pair variables. If the - * byte array contains invalid content the record can be skipped by returning <tt>false</tt>. - * - * @param reuse The optional reusable holder for the line that is read. - * @param bytes The serialized record. - * @return returns whether the record was successfully deserialized - */ - public abstract Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes); -}