http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java deleted file mode 100644 index 49d9a2a..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/DelimitedOutputFormat.java +++ /dev/null @@ -1,319 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.io; - - -import java.io.IOException; -import java.io.UnsupportedEncodingException; - -import org.apache.flink.api.java.record.operators.FileDataSink; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.Record; - -/** - * The base class for output formats that serialize their records into a delimited sequence. - */ -@SuppressWarnings("deprecation") -public abstract class DelimitedOutputFormat extends FileOutputFormat { - private static final long serialVersionUID = 1L; - - /** - * The configuration key for the entry that defines the record delimiter. - */ - public static final String RECORD_DELIMITER = "pact.output.delimited.delimiter"; - - /** - * The configuration key to set the record delimiter encoding. - */ - private static final String RECORD_DELIMITER_ENCODING = "pact.output.delimited.delimiter-encoding"; - - /** - * The configuration key for the entry that defines the write-buffer size. - */ - public static final String WRITE_BUFFER_SIZE = "pact.output.delimited.buffersize"; - - /** - * The default write-buffer size. 64 KiByte. - */ - private static final int DEFAULT_WRITE_BUFFER_SIZE = 64 * 1024; - - /** - * The minimal write-buffer size, 1 KiByte. - */ - private static final int MIN_WRITE_BUFFER_SIZE = 1024; - - // -------------------------------------------------------------------------------------------- - - private byte[] delimiter; - - private byte[] buffer; - - private byte[] targetArray = new byte[64]; - - private int pos; - - private int bufferSize; - - // -------------------------------------------------------------------------------------------- - - - /** - * Calls the super classes to configure themselves and reads the config parameters for the delimiter and - * the write buffer size. - * - * @param config The configuration to read the parameters from. - * - * @see org.apache.flink.api.java.record.io.FileOutputFormat#configure(org.apache.flink.configuration.Configuration) - */ - public void configure(Configuration config) - { - super.configure(config); - - final String delim = config.getString(RECORD_DELIMITER, "\n"); - final String charsetName = config.getString(RECORD_DELIMITER_ENCODING, null); - if (delim == null) { - throw new IllegalArgumentException("The delimiter in the DelimitedOutputFormat must not be null."); - } - try { - this.delimiter = charsetName == null ? delim.getBytes() : delim.getBytes(charsetName); - } catch (UnsupportedEncodingException useex) { - throw new IllegalArgumentException("The charset with the name '" + charsetName + - "' is not supported on this TaskManager instance.", useex); - } - - this.bufferSize = config.getInteger(WRITE_BUFFER_SIZE, DEFAULT_WRITE_BUFFER_SIZE); - if (this.bufferSize < MIN_WRITE_BUFFER_SIZE) { - throw new IllegalArgumentException("The write buffer size must not be less than " + MIN_WRITE_BUFFER_SIZE - + " bytes."); - } - } - - @Override - public void open(int taskNumber, int numTasks) throws IOException - { - super.open(taskNumber, numTasks); - - if (this.buffer == null) { - this.buffer = new byte[this.bufferSize]; - } - if (this.targetArray == null) { - this.targetArray = new byte[64]; - } - this.pos = 0; - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public void close() throws IOException { - if (this.stream != null) { - this.stream.write(this.buffer, 0, this.pos); - } - - // close file stream - super.close(); - } - - /** - * This method is called for every record so serialize itself into the given target array. The method should - * return the number of bytes occupied in the target array. If the target array is not large enough, a negative - * value should be returned. - * <p> - * The absolute value of the returned integer can be given as a hint how large an array is required. The array is - * resized to the return value's absolute value, if that is larger than the current array size. Otherwise, the - * array size is simply doubled. - * - * @param rec The record to be serialized. - * @param target The array to serialize the record into. - * @return The length of the serialized contents, or a negative value, indicating that the array is too small. - * - * @throws Exception If the user code produces an exception that prevents processing the record, it should - * throw it such that the engine recognizes the situation as a fault. - */ - public abstract int serializeRecord(Record rec, byte[] target) throws Exception; - - - - @Override - public void writeRecord(Record record) throws IOException - { - int size; - try { - while ((size = serializeRecord(record, this.targetArray)) < 0) { - if (-size > this.targetArray.length) { - this.targetArray = new byte[-size]; - } - else { - this.targetArray = new byte[this.targetArray.length * 2]; - } - } - } - catch (Exception ex) { - throw new IOException("Error while serializing the record to bytes: " + ex.getMessage(), ex); - } - - if (this.bufferSize - this.pos > size + this.delimiter.length) { - System.arraycopy(this.targetArray, 0, this.buffer, this.pos, size); - System.arraycopy(this.delimiter, 0, this.buffer, pos + size, this.delimiter.length); - pos += size + this.delimiter.length; - } - else { - // copy the target array (piecewise) - int off = 0; - while (off < size) { - int toCopy = Math.min(size - off, this.bufferSize - this.pos); - System.arraycopy(this.targetArray, off, this.buffer, this.pos, toCopy); - - off += toCopy; - this.pos += toCopy; - if (this.pos == this.bufferSize) { - this.pos = 0; - this.stream.write(this.buffer, 0, this.bufferSize); - } - } - - // copy the delimiter (piecewise) - off = 0; - while (off < this.delimiter.length) { - int toCopy = Math.min(this.delimiter.length - off, this.bufferSize - this.pos); - System.arraycopy(this.delimiter, off, this.buffer, this.pos, toCopy); - - off += toCopy; - this.pos += toCopy; - if (this.pos == this.bufferSize) { - this.pos = 0; - this.stream.write(this.buffer, 0, this.bufferSize); - } - } - } - } - - // ============================================================================================ - - /** - * Creates a configuration builder that can be used to set the input format's parameters to the config in a fluent - * fashion. - * - * @return A config builder for setting parameters. - */ - public static ConfigBuilder configureDelimitedFormat(FileDataSink target) { - return new ConfigBuilder(target.getParameters()); - } - - /** - * A builder used to set parameters to the input format's configuration in a fluent way. - */ - protected static abstract class AbstractConfigBuilder<T> extends FileOutputFormat.AbstractConfigBuilder<T> - { - private static final String NEWLINE_DELIMITER = "\n"; - - // -------------------------------------------------------------------- - - /** - * Creates a new builder for the given configuration. - * - * @param config The configuration into which the parameters will be written. - */ - protected AbstractConfigBuilder(Configuration config) { - super(config); - } - - // -------------------------------------------------------------------- - - /** - * Sets the delimiter to be a single character, namely the given one. The character must be within - * the value range <code>0</code> to <code>127</code>. - * - * @param delimiter The delimiter character. - * @return The builder itself. - */ - public T recordDelimiter(char delimiter) { - if (delimiter == '\n') { - this.config.setString(RECORD_DELIMITER, NEWLINE_DELIMITER); - } else { - this.config.setString(RECORD_DELIMITER, String.valueOf(delimiter)); - } - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the delimiter to be the given string. The string will be converted to bytes for more efficient - * comparison during input parsing. The conversion will be done using the platforms default charset. - * - * @param delimiter The delimiter string. - * @return The builder itself. - */ - public T recordDelimiter(String delimiter) { - this.config.setString(RECORD_DELIMITER, delimiter); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the delimiter to be the given string. The string will be converted to bytes for more efficient - * comparison during input parsing. The conversion will be done using the charset with the given name. - * The charset must be available on the processing nodes, otherwise an exception will be raised at - * runtime. - * - * @param delimiter The delimiter string. - * @param charsetName The name of the encoding character set. - * @return The builder itself. - */ - public T recordDelimiter(String delimiter, String charsetName) { - this.config.setString(RECORD_DELIMITER, delimiter); - this.config.setString(RECORD_DELIMITER_ENCODING, charsetName); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - - /** - * Sets the size of the write buffer. - * - * @param sizeInBytes The size of the write buffer in bytes. - * @return The builder itself. - */ - public T writeBufferSize(int sizeInBytes) { - this.config.setInteger(WRITE_BUFFER_SIZE, sizeInBytes); - @SuppressWarnings("unchecked") - T ret = (T) this; - return ret; - } - } - - /** - * A builder used to set parameters to the input format's configuration in a fluent way. - */ - public static class ConfigBuilder extends AbstractConfigBuilder<ConfigBuilder> - { - /** - * Creates a new builder for the given configuration. - * - * @param targetConfig The configuration into which the parameters will be written. - */ - protected ConfigBuilder(Configuration targetConfig) { - super(targetConfig); - } - - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java deleted file mode 100644 index d448c6a..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessFixedLengthInputFormat.java +++ /dev/null @@ -1,227 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import java.io.IOException; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.GenericInputSplit; -import org.apache.flink.types.Record; - -/** - * This input format starts an external process and reads its input from the standard out (stdout) of the started process. - * The input is split into fixed-sized segments from which a {@link Record} is generated. - * The external process is started outside of the JVM via a provided start command and can be an arbitrary program, - * e.g., a data generator or a shell script. The input format checks the exit code of the process - * to validate whether the process terminated correctly. A list of allowed exit codes can be provided. - * The input format requires ({@link ExternalProcessInputSplit} objects that hold the command to execute. - * - * <b>Warning:</b> This format does not consume the standard error stream (stderr) of the started process. This might cause deadlocks. - * - * @param <T> The type of the input split (must extend ExternalProcessInputSplit) - */ -public abstract class ExternalProcessFixedLengthInputFormat<T extends ExternalProcessInputSplit> extends ExternalProcessInputFormat<T> { - - private static final long serialVersionUID = 1L; - - /** - * The config parameter which defines the fixed length of a record. - */ - public static final String RECORDLENGTH_PARAMETER_KEY = "pact.input.recordLength"; - - /** - * The default read buffer size = 1MB. - */ - private static final int DEFAULT_TARGET_READ_BUFFER_SIZE = 1024 * 1024; - - /** - * Buffer to read a batch of records from a file - */ - private byte[] readBuffer; - - /** - * read position within the read buffer - */ - private int readBufferReadPos; - - /** - * fill marker within the read buffer - */ - private int readBufferFillPos; - - /** - * remaining space within the read buffer - */ - private int readBufferRemainSpace; - - /** - * target size of the read buffer - */ - private int targetReadBufferSize = DEFAULT_TARGET_READ_BUFFER_SIZE; - - /** - * fixed length of all records - */ - protected int recordLength; - - /** - * Flags to indicate the end of the split - */ - private boolean noMoreStreamInput; - private boolean noMoreRecordBuffers; - - /** - * Reads a record out of the given buffer. This operation always consumes the standard number of - * bytes, regardless of whether the produced record was valid. - * - * @param target The target Record - * @param buffer The buffer containing the binary data. - * @param startPos The start position in the byte array. - * @return True, is the record is valid, false otherwise. - */ - public abstract boolean readBytes(Record target, byte[] buffer, int startPos); - - - @Override - public void configure(Configuration parameters) - { - // configure parent - super.configure(parameters); - - // read own parameters - this.recordLength = parameters.getInteger(RECORDLENGTH_PARAMETER_KEY, 0); - if (recordLength < 1) { - throw new IllegalArgumentException("The record length parameter must be set and larger than 0."); - } - - } - - /** - * Sets the target size of the buffer to be used to read from the stdout stream. - * The actual size depends on the record length since it is chosen such that records are not split. - * This method has only an effect, if it is called before the input format is opened. - * - * @param targetReadBufferSize The target size of the read buffer. - */ - public void setTargetReadBufferSize(int targetReadBufferSize) - { - this.targetReadBufferSize = targetReadBufferSize; - } - - - @Override - public void open(GenericInputSplit split) throws IOException { - - super.open(split); - - // compute readBufferSize - if(recordLength > this.targetReadBufferSize) { - // read buffer is at least as big as record - this.readBuffer = new byte[recordLength]; - } else if (this.targetReadBufferSize % recordLength == 0) { - // target read buffer size is a multiple of record length, so it's ok - this.readBuffer = new byte[this.targetReadBufferSize]; - } else { - // extent default read buffer size such that records are not split - this.readBuffer = new byte[(recordLength - (this.targetReadBufferSize % recordLength)) + this.targetReadBufferSize]; - } - - // initialize read buffer positions - this.readBufferReadPos = 0; - this.readBufferFillPos = 0; - this.readBufferRemainSpace = readBuffer.length; - // initialize end flags - this.noMoreStreamInput = false; - this.noMoreRecordBuffers = false; - - } - - - @Override - public boolean reachedEnd() throws IOException { - return noMoreRecordBuffers; - } - - - @Override - public Record nextRecord(Record reuse) throws IOException { - // check if read buffer must be filled (less than one record contained) - if(this.readBufferFillPos - this.readBufferReadPos < this.recordLength) { - // try to fill read buffer - if(!this.fillReadBuffer()) { - return null; - } - } - - // update read buffer read marker - this.readBufferReadPos += this.recordLength; - - return this.readBytes(reuse, readBuffer, (this.readBufferReadPos-this.recordLength)) ? reuse : null; - - } - - /** - * Fills the read buffer by reading from the stdout stream of the external process. - * WARNING: We do not read from the error stream. This might cause a deadlock. - * - * @return true if new content was filled into the buffer, false otherwise. - * @throws IOException - */ - private boolean fillReadBuffer() throws IOException { - // TODO: Add reading from error stream of external process. Otherwise the InputFormat might get deadlocked! - - // stream was completely processed - if(noMoreStreamInput) { - if(this.readBufferReadPos == this.readBufferFillPos) { - this.noMoreRecordBuffers = true; - return false; - } else { - throw new RuntimeException("External process produced incomplete record"); - } - } - - // the buffer was completely filled and processed - if(this.readBufferReadPos == this.readBuffer.length && - this.readBufferRemainSpace == 0) { - // reset counters and fill again - this.readBufferFillPos = 0; - this.readBufferRemainSpace = this.readBuffer.length; - this.readBufferReadPos = 0; - } - - // as long as not at least one record is complete - while(this.readBufferFillPos - this.readBufferReadPos < this.recordLength) { - // read from stdout - int readCnt = super.extProcOutStream.read(this.readBuffer, this.readBufferFillPos, this.readBufferRemainSpace); - - if(readCnt == -1) { - // the is nothing more to read - this.noMoreStreamInput = true; - return false; - } else { - // update fill position and remain cnt - this.readBufferFillPos += readCnt; - this.readBufferRemainSpace -= readCnt; - } - } - return true; - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java deleted file mode 100644 index cbf16b5..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputFormat.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import java.io.IOException; -import java.io.InputStream; -import java.util.StringTokenizer; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.io.GenericInputSplit; - -/** - * This input format starts an external process and reads its input from the standard out (stdout) of the started process. - * The process is started outside of the JVM via a provided start command and can be an arbitrary program, e.g., a data generator or a shell script. - * The input format checks the exit code of the process to validate whether the process terminated correctly. A list of allowed exit codes can be provided. - * The input format requires ({@link ExternalProcessInputSplit} objects that hold the command to execute. - * - * <b>Attention! </b><br> - * You must take care to read from (and process) both output streams of the process, standard out (stdout) and standard error (stderr). - * Otherwise, the input format might get deadlocked! - * - * - * @param <T> The type of the input split (must extend ExternalProcessInputSplit) - */ -public abstract class ExternalProcessInputFormat<T extends ExternalProcessInputSplit> extends GenericInputFormat { - private static final long serialVersionUID = 1L; - - /** - * The config parameter lists (comma separated) all allowed exit codes - */ - public static final String ALLOWEDEXITCODES_PARAMETER_KEY = "pact.input.externalProcess.allowedExitCodes"; - - /** - * The external process - */ - private Process extProc; - - /** - * The stdout stream of the external process - */ - protected InputStream extProcOutStream; - - /** - * The stderr stream of the external process - */ - protected InputStream extProcErrStream; - - /** - * Array of allowed exit codes - */ - protected int[] allowedExitCodes; - - - @Override - public void configure(Configuration parameters) { - // get allowed exit codes - String allowedExitCodesList = parameters.getString(ALLOWEDEXITCODES_PARAMETER_KEY, "0"); - - // parse allowed exit codes - StringTokenizer st = new StringTokenizer(allowedExitCodesList, ","); - this.allowedExitCodes = new int[st.countTokens()]; - - for(int i=0; i<this.allowedExitCodes.length; i++) { - this.allowedExitCodes[i] = Integer.parseInt(st.nextToken().trim()); - } - - } - - @Override - public void close() throws IOException { - try { - // get exit code - int exitCode = this.extProc.exitValue(); - // check whether exit code is allowed - boolean exitCodeOk = false; - for (int allowedExitCode : this.allowedExitCodes) { - if (allowedExitCode == exitCode) { - exitCodeOk = true; - break; - } - } - if(!exitCodeOk) { - // external process did not finish with an allowed exit code - throw new RuntimeException("External process did not finish with an allowed exit code: "+exitCode); - } - } catch(IllegalThreadStateException itse) { - // process did not terminate yet, shut it down! - this.extProc.destroy(); - if(!this.reachedEnd()) { - throw new RuntimeException("External process was destroyed although stream was not fully read."); - } - } finally { - this.extProcErrStream.close(); - this.extProcOutStream.close(); - } - } - - @Override - public void open(GenericInputSplit split) throws IOException { - - if(!(split instanceof ExternalProcessInputSplit)) { - throw new IOException("Invalid InputSplit type."); - } - - ExternalProcessInputSplit epSplit = (ExternalProcessInputSplit)split; - - // check if process command is valid string - if(epSplit.getExternalProcessCommand() != null && !epSplit.getExternalProcessCommand().equals("")) { - try { - // run the external process - this.extProc = Runtime.getRuntime().exec(epSplit.getExternalProcessCommand()); - } catch (IOException e) { - throw new IOException("IO Exception when starting external process: "+epSplit.getExternalProcessCommand()); - } - // connect streams to stdout and stderr - this.extProcOutStream = this.extProc.getInputStream(); - this.extProcErrStream = this.extProc.getErrorStream(); - } else { - throw new IllegalArgumentException("External Process Command not set"); - } - } - - public void waitForProcessToFinish() throws InterruptedException { - extProc.waitFor(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java deleted file mode 100644 index e087cb1..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/ExternalProcessInputSplit.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.api.java.record.io; - -import org.apache.flink.core.io.GenericInputSplit; - -/** - * The ExternalProcessInputSplit contains all information for {@link org.apache.flink.api.common.io.InputFormat} - * that read their data from external processes. - * Each parallel instance of an InputFormat starts an external process and reads its output. - * The command to start the external process must be executable on all nodes. - * - * @see ExternalProcessInputFormat - * @see ExternalProcessFixedLengthInputFormat - */ -public class ExternalProcessInputSplit extends GenericInputSplit { - - private static final long serialVersionUID = 1L; - - // command to be executed for this input split - private final String extProcessCommand; - - /** - * Instantiates an ExternalProcessInputSplit - * - * @param splitNumber The number of the input split - * @param extProcCommand The command to be executed for the input split - */ - public ExternalProcessInputSplit(int splitNumber, int numSplits, String extProcCommand) { - super(splitNumber, numSplits); - this.extProcessCommand = extProcCommand; - } - - /** - * Returns the command to be executed to derive the input for this split - * - * @return the command to be executed to derive the input for this split - */ - public String getExternalProcessCommand() { - return this.extProcessCommand; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java deleted file mode 100644 index c73da71..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileInputFormat.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import org.apache.flink.types.Record; - -/** - * The base interface for input formats that read {@link Record}s from a - * file. - */ -public abstract class FileInputFormat extends org.apache.flink.api.common.io.FileInputFormat<Record> { - - private static final long serialVersionUID = -8819984594406641418L; -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java deleted file mode 100644 index c9591ee..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FileOutputFormat.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import org.apache.flink.types.Record; - - -/** - * The abstract base class for all output formats that are file based. Contains the logic to open/close the target - * file streams. - */ -public abstract class FileOutputFormat extends org.apache.flink.api.common.io.FileOutputFormat<Record> { - - private static final long serialVersionUID = 3832934435044920834L; -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java deleted file mode 100644 index e544eb9..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/FixedLengthInputFormat.java +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - - -import java.io.IOException; - -import org.apache.flink.api.common.io.statistics.BaseStatistics; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.types.Record; - -/** - * - */ -public abstract class FixedLengthInputFormat extends FileInputFormat { - private static final long serialVersionUID = 1L; - - /** - * The config parameter which defines the fixed length of a record. - */ - public static final String RECORDLENGTH_PARAMETER_KEY = "pact.fix-input.record-length"; - - /** - * The default read buffer size = 1MB. - */ - private static final int DEFAULT_READ_BUFFER_SIZE = 1024 * 1024; - - /** - * Buffer to read a batch of records from a file - */ - private byte[] readBuffer; - - /** - * The position in the stream - */ - private long streamPos; - - /** - * The end position in the stream. - */ - private long streamEnd; - - /** - * read position within the read buffer - */ - private int readBufferPos; - - /** - * The limit of the data in the read buffer. - */ - private int readBufferLimit; - - /** - * fixed length of all records - */ - private int recordLength; - - /** - * size of the read buffer - */ - private int readBufferSize = DEFAULT_READ_BUFFER_SIZE; - - /** - * The flag whether the stream is exhausted. - */ - private boolean exhausted; - - // -------------------------------------------------------------------------------------------- - - /** - * Constructor only sets the key and value classes - */ - protected FixedLengthInputFormat() {} - - /** - * Reads a record out of the given buffer. This operation always consumes the standard number of - * bytes, regardless of whether the produced record was valid. - * - * @param target The target Record - * @param buffer The buffer containing the binary data. - * @param startPos The start position in the byte array. - * @return True, is the record is valid, false otherwise. - */ - public abstract boolean readBytes(Record target, byte[] buffer, int startPos); - - /** - * Returns the fixed length of a record. - * - * @return the fixed length of a record. - */ - public int getRecordLength() { - return this.recordLength; - } - - /** - * Gets the size of the buffer internally used to parse record boundaries. - * - * @return The size of the parsing buffer. - */ - public int getReadBufferSize() { - return this.readBuffer.length; - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public void configure(Configuration parameters) { - // pass parameters to FileInputFormat - super.configure(parameters); - - // read own parameters - this.recordLength = parameters.getInteger(RECORDLENGTH_PARAMETER_KEY, 0); - if (recordLength < 1) { - throw new IllegalArgumentException("The record length parameter must be set and larger than 0."); - } - } - - @Override - public void open(FileInputSplit split) throws IOException { - // open input split using FileInputFormat - super.open(split); - - // adjust the stream positions for boundary splits - int recordOffset = (int) (this.splitStart % this.recordLength); - if(recordOffset != 0) { - // move start to next boundary - super.stream.seek(this.splitStart + recordOffset); - } - this.streamPos = this.splitStart + recordOffset; - this.streamEnd = this.splitStart + this.splitLength; - this.streamEnd += this.streamEnd % this.recordLength; - - // adjust readBufferSize - this.readBufferSize += this.recordLength - (this.readBufferSize % this.recordLength); - - if (this.readBuffer == null || this.readBuffer.length != this.readBufferSize) { - this.readBuffer = new byte[this.readBufferSize]; - } - - this.readBufferLimit = 0; - this.readBufferPos = 0; - this.exhausted = false; - fillReadBuffer(); - } - - /** - * {@inheritDoc} - * @throws IOException - */ - @Override - public FileBaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { - final FileBaseStatistics stats = super.getStatistics(cachedStats); - return stats == null ? null : - new FileBaseStatistics(stats.getLastModificationTime(), stats.getTotalInputSize(), this.recordLength); - } - - // -------------------------------------------------------------------------------------------- - - - @Override - public boolean reachedEnd() { - return this.exhausted; - } - - - @Override - public Record nextRecord(Record reuse) throws IOException { - // check if read buffer contains another full record - if (this.readBufferLimit - this.readBufferPos <= 0) { - // get another buffer - fillReadBuffer(); - // check if source is exhausted - if (this.exhausted) { - return null; - } - } - else if (this.readBufferLimit - this.readBufferPos < this.recordLength) { - throw new IOException("Unable to read full record"); - } - - boolean val = readBytes(reuse, this.readBuffer, this.readBufferPos); - - this.readBufferPos += this.recordLength; - if (this.readBufferPos >= this.readBufferLimit) { - fillReadBuffer(); - } - return val ? reuse : null; - } - - /** - * Fills the next read buffer from the file stream. - * - * @throws IOException - */ - private void fillReadBuffer() throws IOException { - // special case for compressed files. - if(splitLength == FileInputFormat.READ_WHOLE_SPLIT_FLAG) { - int read = this.stream.read(this.readBuffer, 0, this.readBufferSize); - if (read == -1) { - exhausted = true; - } else { - this.streamPos += read; - this.readBufferPos = 0; - this.readBufferLimit = read; - } - return; - } - - int toRead = (int) Math.min(this.streamEnd - this.streamPos, this.readBufferSize); - if (toRead <= 0) { - this.exhausted = true; - return; - } - - // fill read buffer - int read = this.stream.read(this.readBuffer, 0, toRead); - - if (read <= 0) { - this.exhausted = true; - } else { - this.streamPos += read; - this.readBufferPos = 0; - this.readBufferLimit = read; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java deleted file mode 100644 index 46fa6e5..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/GenericInputFormat.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import org.apache.flink.types.Record; - -/** - * Generic base class for all inputs that are not based on files, specific to Record. - */ -public abstract class GenericInputFormat extends org.apache.flink.api.common.io.GenericInputFormat<Record> { - private static final long serialVersionUID = 1L; -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java b/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java deleted file mode 100644 index c606458..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/io/TextInputFormat.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.io; - -import java.nio.ByteBuffer; -import java.nio.CharBuffer; -import java.nio.charset.CharacterCodingException; -import java.nio.charset.Charset; -import java.nio.charset.CharsetDecoder; -import java.util.Arrays; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.types.Record; -import org.apache.flink.types.StringValue; - -/** - * Base implementation for an input format that returns each line as a separate record that contains - * only a single string, namely the line. - */ -public class TextInputFormat extends DelimitedInputFormat { - private static final long serialVersionUID = 1L; - - public static final String CHARSET_NAME = "textformat.charset"; - - public static final String FIELD_POS = "textformat.pos"; - - public static final String DEFAULT_CHARSET_NAME = "UTF-8"; - - private static final Logger LOG = LoggerFactory.getLogger(TextInputFormat.class); - - - protected final StringValue theString = new StringValue(); - - - // all fields below here are set in configure / open, so we do not serialze them - - protected transient CharsetDecoder decoder; - - protected transient ByteBuffer byteWrapper; - - protected transient int pos; - - protected transient boolean ascii; - - /** - * Code of \r, used to remove \r from a line when the line ends with \r\n - */ - private static final byte CARRIAGE_RETURN = (byte) '\r'; - - /** - * Code of \n, used to identify if \n is used as delimiter - */ - private static final byte NEW_LINE = (byte) '\n'; - - - // -------------------------------------------------------------------------------------------- - - @Override - public void configure(Configuration parameters) { - super.configure(parameters); - - // get the charset for the decoding - String charsetName = parameters.getString(CHARSET_NAME, DEFAULT_CHARSET_NAME); - if (charsetName == null || !Charset.isSupported(charsetName)) { - throw new RuntimeException("Unsupported charset: " + charsetName); - } - - if (charsetName.equals("ISO-8859-1") || charsetName.equalsIgnoreCase("ASCII")) { - this.ascii = true; - } else { - this.decoder = Charset.forName(charsetName).newDecoder(); - this.byteWrapper = ByteBuffer.allocate(1); - } - - // get the field position to write in the record - this.pos = parameters.getInteger(FIELD_POS, 0); - if (this.pos < 0) { - throw new RuntimeException("Illegal configuration value for the target position: " + this.pos); - } - } - - // -------------------------------------------------------------------------------------------- - - public Record readRecord(Record reuse, byte[] bytes, int offset, int numBytes) { - StringValue str = this.theString; - - //Check if \n is used as delimiter and the end of this line is a \r, then remove \r from the line - if (this.getDelimiter() != null && this.getDelimiter().length == 1 - && this.getDelimiter()[0] == NEW_LINE && offset+numBytes >= 1 - && bytes[offset+numBytes-1] == CARRIAGE_RETURN){ - numBytes -= 1; - } - - - if (this.ascii) { - str.setValueAscii(bytes, offset, numBytes); - } - else { - ByteBuffer byteWrapper = this.byteWrapper; - if (bytes != byteWrapper.array()) { - byteWrapper = ByteBuffer.wrap(bytes, 0, bytes.length); - this.byteWrapper = byteWrapper; - } - byteWrapper.limit(offset + numBytes); - byteWrapper.position(offset); - - try { - CharBuffer result = this.decoder.decode(byteWrapper); - str.setValue(result); - } - catch (CharacterCodingException e) { - byte[] copy = new byte[numBytes]; - System.arraycopy(bytes, offset, copy, 0, numBytes); - LOG.warn("Line could not be encoded: " + Arrays.toString(copy), e); - return null; - } - } - - reuse.clear(); - reuse.setField(this.pos, str); - return reuse; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java deleted file mode 100644 index 29dab6a..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/BulkIteration.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.operators; - -import org.apache.flink.api.common.operators.base.BulkIterationBase; -import org.apache.flink.types.Record; - -/** - * * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - */ -@Deprecated -public class BulkIteration extends BulkIterationBase<Record> { - public BulkIteration() { - super(OperatorInfoHelper.unary()); - } - - public BulkIteration(String name) { - super(OperatorInfoHelper.unary(), name); - } - - /** - * Specialized operator to use as a recognizable place-holder for the input to the - * step function when composing the nested data flow. - */ - public static class PartialSolutionPlaceHolder extends BulkIterationBase.PartialSolutionPlaceHolder<Record> { - public PartialSolutionPlaceHolder(BulkIterationBase<Record> container) { - super(container, OperatorInfoHelper.unary()); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java deleted file mode 100644 index 09811d6..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CoGroupOperator.java +++ /dev/null @@ -1,399 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.java.record.operators; - -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.operators.Ordering; -import org.apache.flink.api.common.operators.RecordOperator; -import org.apache.flink.api.common.operators.base.CoGroupOperatorBase; -import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; -import org.apache.flink.api.common.operators.util.UserCodeWrapper; -import org.apache.flink.api.java.operators.translation.WrappingFunction; -import org.apache.flink.api.java.record.functions.CoGroupFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation; -import org.apache.flink.types.Key; -import org.apache.flink.types.Record; -import org.apache.flink.util.Collector; -import org.apache.flink.util.InstantiationUtil; - -import com.google.common.base.Preconditions; - -/** - * CoGroupOperator that applies a {@link CoGroupFunction} to groups of records sharing - * the same key (one group per input). - * - * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * @see CoGroupFunction - */ -@Deprecated -public class CoGroupOperator extends CoGroupOperatorBase<Record, Record, Record, org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> implements RecordOperator { - - /** - * The types of the keys that the operator groups on. - */ - private final Class<? extends Key<?>>[] keyTypes; - - // -------------------------------------------------------------------------------------------- - - /** - * Creates a Builder with the provided {@link CoGroupFunction} implementation. - * - * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator. - * @param keyClass The class of the key data type. - * @param keyColumn1 The position of the key in the first input's records. - * @param keyColumn2 The position of the key in the second input's records. - */ - public static Builder builder(CoGroupFunction udf, Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) { - WrappingCoGroupFunction wrapper = new WrappingCoGroupFunction(udf); - return new Builder(new UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2); - } - - /** - * Creates a Builder with the provided {@link CoGroupFunction} implementation. - * - * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator. - * @param keyClass The class of the key data type. - * @param keyColumn1 The position of the key in the first input's records. - * @param keyColumn2 The position of the key in the second input's records. - */ - public static Builder builder(Class<? extends CoGroupFunction> udf, Class<? extends Key<?>> keyClass, - int keyColumn1, int keyColumn2) - { - WrappingCoGroupFunction wrapper = new WrappingClassCoGroupFunction(udf); - return new Builder(new UserCodeObjectWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>>(wrapper), keyClass, keyColumn1, keyColumn2); - } - - /** - * The private constructor that only gets invoked from the Builder. - * @param builder - */ - protected CoGroupOperator(Builder builder) { - super(builder.udf, OperatorInfoHelper.binary(), builder.getKeyColumnsArray1(), builder.getKeyColumnsArray2(), builder.name); - this.keyTypes = builder.getKeyClassesArray(); - - if (builder.inputs1 != null && !builder.inputs1.isEmpty()) { - setFirstInput(Operator.createUnionCascade(builder.inputs1)); - } - if (builder.inputs2 != null && !builder.inputs2.isEmpty()) { - setSecondInput(Operator.createUnionCascade(builder.inputs2)); - } - - // sanity check solution set key mismatches - if (input1 instanceof DeltaIteration.SolutionSetPlaceHolder) { - int[] positions = getKeyColumns(0); - ((DeltaIteration.SolutionSetPlaceHolder) input1).checkJoinKeyFields(positions); - } - if (input2 instanceof DeltaIteration.SolutionSetPlaceHolder) { - int[] positions = getKeyColumns(1); - ((DeltaIteration.SolutionSetPlaceHolder) input2).checkJoinKeyFields(positions); - } - - setBroadcastVariables(builder.broadcastInputs); - setGroupOrderForInputOne(builder.secondaryOrder1); - setGroupOrderForInputTwo(builder.secondaryOrder2); - - CoGroupFunction function = ((WrappingCoGroupFunction) builder.udf.getUserCodeObject()).getWrappedFunction(); - setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(new UserCodeObjectWrapper<CoGroupFunction>(function))); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public Class<? extends Key<?>>[] getKeyClasses() { - return this.keyTypes; - } - - // -------------------------------------------------------------------------------------------- - - - /** - * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition). - */ - public static class Builder { - - /* The required parameters */ - private final UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf; - private final List<Class<? extends Key<?>>> keyClasses; - private final List<Integer> keyColumns1; - private final List<Integer> keyColumns2; - - /* The optional parameters */ - private List<Operator<Record>> inputs1; - private List<Operator<Record>> inputs2; - private Map<String, Operator<Record>> broadcastInputs; - private Ordering secondaryOrder1; - private Ordering secondaryOrder2; - private String name; - - /** - * Creates a Builder with the provided {@link CoGroupFunction} implementation. - * - * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator. - * @param keyClass The class of the key data type. - * @param keyColumn1 The position of the key in the first input's records. - * @param keyColumn2 The position of the key in the second input's records. - */ - protected Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf, Class<? extends Key<?>> keyClass, - int keyColumn1, int keyColumn2) - { - this.udf = udf; - this.keyClasses = new ArrayList<Class<? extends Key<?>>>(); - this.keyClasses.add(keyClass); - this.keyColumns1 = new ArrayList<Integer>(); - this.keyColumns1.add(keyColumn1); - this.keyColumns2 = new ArrayList<Integer>(); - this.keyColumns2.add(keyColumn2); - this.inputs1 = new ArrayList<Operator<Record>>(); - this.inputs2 = new ArrayList<Operator<Record>>(); - this.broadcastInputs = new HashMap<String, Operator<Record>>(); - } - - /** - * Creates a Builder with the provided {@link CoGroupFunction} implementation. This method is intended - * for special case sub-types only. - * - * @param udf The {@link CoGroupFunction} implementation for this CoGroup operator. - */ - protected Builder(UserCodeWrapper<org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record>> udf) { - this.udf = udf; - this.keyClasses = new ArrayList<Class<? extends Key<?>>>(); - this.keyColumns1 = new ArrayList<Integer>(); - this.keyColumns2 = new ArrayList<Integer>(); - this.inputs1 = new ArrayList<Operator<Record>>(); - this.inputs2 = new ArrayList<Operator<Record>>(); - this.broadcastInputs = new HashMap<String, Operator<Record>>(); - } - - private int[] getKeyColumnsArray1() { - int[] result = new int[keyColumns1.size()]; - for (int i = 0; i < keyColumns1.size(); ++i) { - result[i] = keyColumns1.get(i); - } - return result; - } - - private int[] getKeyColumnsArray2() { - int[] result = new int[keyColumns2.size()]; - for (int i = 0; i < keyColumns2.size(); ++i) { - result[i] = keyColumns2.get(i); - } - return result; - } - - @SuppressWarnings("unchecked") - private Class<? extends Key<?>>[] getKeyClassesArray() { - return keyClasses.toArray(new Class[keyClasses.size()]); - } - - /** - * Adds additional key field. - * - * @param keyClass The class of the key data type. - * @param keyColumn1 The position of the key in the first input's records. - * @param keyColumn2 The position of the key in the second input's records. - */ - public Builder keyField(Class<? extends Key<?>> keyClass, int keyColumn1, int keyColumn2) { - keyClasses.add(keyClass); - keyColumns1.add(keyColumn1); - keyColumns2.add(keyColumn2); - return this; - } - /** - * Sets the order of the elements within a group for the first input. - * - * @param order The order for the elements in a group. - */ - public Builder secondaryOrder1(Ordering order) { - this.secondaryOrder1 = order; - return this; - } - - /** - * Sets the order of the elements within a group for the second input. - * - * @param order The order for the elements in a group. - */ - public Builder secondaryOrder2(Ordering order) { - this.secondaryOrder2 = order; - return this; - } - - /** - * Sets the input operator for input 1. - * - * @param input The input operator for input 1. - */ - public Builder input1(Operator<Record> input) { - Preconditions.checkNotNull(input, "The input must not be null"); - - this.inputs1.clear(); - this.inputs1.add(input); - return this; - } - - /** - * Sets one or several inputs (union) for input 1. - * - * @param inputs - */ - public Builder input1(Operator<Record>...inputs) { - this.inputs1.clear(); - for (Operator<Record> c : inputs) { - this.inputs1.add(c); - } - return this; - } - - /** - * Sets the input operator for input 2. - * - * @param input The input operator for input 2. - */ - public Builder input2(Operator<Record> input) { - Preconditions.checkNotNull(input, "The input must not be null"); - - this.inputs2.clear(); - this.inputs2.add(input); - return this; - } - - /** - * Sets one or several inputs (union) for input 2. - * - * @param inputs - */ - public Builder input2(Operator<Record>...inputs) { - this.inputs2.clear(); - for (Operator<Record> c : inputs) { - this.inputs2.add(c); - } - return this; - } - - /** - * Sets the first inputs. - * - * @param inputs - */ - public Builder inputs1(List<Operator<Record>> inputs) { - this.inputs1 = inputs; - return this; - } - - /** - * Sets the second inputs. - * - * @param inputs - */ - public Builder inputs2(List<Operator<Record>> inputs) { - this.inputs2 = inputs; - return this; - } - - /** - * Binds the result produced by a plan rooted at {@code root} to a - * variable used by the UDF wrapped in this operator. - */ - public Builder setBroadcastVariable(String name, Operator<Record> input) { - this.broadcastInputs.put(name, input); - return this; - } - - /** - * Binds multiple broadcast variables. - */ - public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) { - this.broadcastInputs.clear(); - this.broadcastInputs.putAll(inputs); - return this; - } - - /** - * Sets the name of this operator. - * - * @param name - */ - public Builder name(String name) { - this.name = name; - return this; - } - - /** - * Creates and returns a CoGroupOperator from using the values given - * to the builder. - * - * @return The created operator - */ - public CoGroupOperator build() { - if (keyClasses.size() <= 0) { - throw new IllegalStateException("At least one key attribute has to be set."); - } - - if (name == null) { - name = udf.getUserCodeClass().getName(); - } - return new CoGroupOperator(this); - } - } - - // ============================================================================================ - - public static class WrappingCoGroupFunction extends WrappingFunction<CoGroupFunction> - implements org.apache.flink.api.common.functions.CoGroupFunction<Record, Record, Record> { - - private static final long serialVersionUID = 1L; - - public WrappingCoGroupFunction(CoGroupFunction coGrouper) { - super(coGrouper); - } - - @Override - public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) throws Exception { - this.wrappedFunction.coGroup(records1.iterator(), records2.iterator(), out); - } - } - - public static final class WrappingClassCoGroupFunction extends WrappingCoGroupFunction { - - private static final long serialVersionUID = 1L; - - public WrappingClassCoGroupFunction(Class<? extends CoGroupFunction> reducer) { - super(InstantiationUtil.instantiate(reducer)); - } - - private void writeObject(ObjectOutputStream out) throws IOException { - out.writeObject(wrappedFunction.getClass()); - } - - private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { - Class<?> clazz = (Class<?>) in.readObject(); - this.wrappedFunction = (CoGroupFunction) InstantiationUtil.instantiate(clazz); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java deleted file mode 100644 index 2b84b73..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CollectionDataSource.java +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.operators; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; - -import org.apache.flink.api.common.io.GenericInputFormat; -import org.apache.flink.api.common.operators.GenericDataSourceBase; -import org.apache.flink.api.java.record.io.CollectionInputFormat; -import org.apache.flink.types.Record; - -/** - * - * * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * Operator for input nodes which reads data from collection or iterator. - * Use this operator if you want to use a collection of data from the application - * as an input for a job to the cluster. - * There are two main ways to use the CollectionDataSource: - * - * Using an {@link java.util.Iterator} that is also {@link java.io.Serializable}. - * <pre> - * CollectionDataSource source = new CollectionDataSource(mySerializableIterator, "IterSource"); - * </pre> - * - * Using a Collection of Java Objects. - * - * <pre> - * CollectionDataSource source2 = new CollectionDataSource(new List<String>(), "Collection source"); - * </pre> - * - * Note that you can as many elements as you want to the constructor: - * - * <pre> - * CollectionDataSource("Varargs String source", "some", "strings", "that", "get", "distributed"); - * </pre> - * - * The only limitation is that the elements need to have the same type. - */ -@Deprecated -public class CollectionDataSource extends GenericDataSourceBase<Record, GenericInputFormat<Record>> { - - private static String DEFAULT_NAME = "<Unnamed Collection Data Source>"; - - /** - * Creates a new instance for the given input using the given input format. - * - * @param f - * The {@link CollectionInputFormat} implementation used to read the data. - * @param data - * The input data. It should be a collection, an array or a serializable iterator. - * @param name - * The given name for the Pact, used in plans, logs and progress messages. - */ - public CollectionDataSource(CollectionInputFormat f, String name, Object... data) { - super(f, OperatorInfoHelper.source(), name); - Collection<Object> tmp = new ArrayList<Object>(); - for (Object o : data) { - tmp.add(o); - } - checkFormat(tmp); - f.setData(tmp); - } - - public CollectionDataSource(CollectionInputFormat f, String name, Object[][] data) { - super(f, OperatorInfoHelper.source(), name); - Collection<Object> tmp = new ArrayList<Object>(); - for (Object o : data) { - tmp.add(o); - } - checkFormat(tmp); - f.setData(tmp); - } - - public CollectionDataSource(CollectionInputFormat f, Collection<?> data, String name) { - super(f, OperatorInfoHelper.source(), name); - checkFormat(data); - f.setData(data); - } - - public <T extends Iterator<?>, Serializable> CollectionDataSource(CollectionInputFormat f, T data, String name) { - super(f, OperatorInfoHelper.source(), name); - f.setIter(data); - } - - /** - * Creates a new instance for the given input using the given input format. The contract has the default name. - * The input types will be checked. If the input types don't agree, an exception will occur. - * - * @param args - * The input data. It should be a collection, an array or a serializable iterator. - * @param name - * The given name for the Pact, used in plans, logs and progress messages. - */ - public CollectionDataSource(String name, Object... args) { - this(new CollectionInputFormat(), name, args); - } - - public CollectionDataSource(String name, Object[][] args) { - this(new CollectionInputFormat(), name, args); - } - - public CollectionDataSource(Collection<?> args, String name) { - this(new CollectionInputFormat(), args, name); - } - - public <T extends Iterator<?>, Serializable> CollectionDataSource(T args, String name) { - this(new CollectionInputFormat(), args, name); - } - - // -------------------------------------------------------------------------------------------- - /** - * for scala compatible, scala-to-java type conversion always has an object wrapper - */ - public CollectionDataSource(Object... args) { - this(new CollectionInputFormat(), args); - } - - @SuppressWarnings("unchecked") - public CollectionDataSource(CollectionInputFormat f, Object... data) { - super(f, OperatorInfoHelper.source(), DEFAULT_NAME); - if (data.length == 1 && data[0] instanceof Iterator) { - f.setIter((Iterator<Object>) data[0]); - } - else if (data.length == 1 && data[0] instanceof Collection) { - checkFormat((Collection<Object>) data[0]); - f.setData((Collection<Object>) data[0]); - } - else { - Collection<Object> tmp = new ArrayList<Object>(); - for (Object o : data) { - tmp.add(o); - } - checkFormat(tmp); - f.setData(tmp); - } - } - - // -------------------------------------------------------------------------------------------- - /* - * check whether the input field has the same type - */ - private <T> void checkFormat(Collection<T> c) { - Class<?> type = null; - List<Class<?>> typeList = new ArrayList<Class<?>>(); - Iterator<T> it = c.iterator(); - while (it.hasNext()) { - Object o = it.next(); - - // check the input types for 1-dimension - if (type != null && !type.equals(o.getClass())) { - throw new RuntimeException("elements of input list should have the same type"); - } - else { - type = o.getClass(); - } - - // check the input types for 2-dimension array - if (typeList.size() == 0 && o.getClass().isArray()) { - for (Object s : (Object[]) o) { - typeList.add(s.getClass()); - } - } - else if (o.getClass().isArray()) { - int index = 0; - if (((Object[]) o).length != typeList.size()) { - throw new RuntimeException("elements of input list should have the same size"); - } - for (Object s : (Object[]) o) { - if (!s.getClass().equals(typeList.get(index++))) { - throw new RuntimeException("elements of input list should have the same type"); - } - } - } - - // check the input types for 2-dimension collection - if (typeList.size() == 0 && o instanceof Collection) { - @SuppressWarnings("unchecked") - Iterator<Object> tmpIt = ((Collection<Object>) o).iterator(); - while (tmpIt.hasNext()) { - Object s = tmpIt.next(); - typeList.add(s.getClass()); - } - } - else if (o instanceof Collection) { - int index = 0; - @SuppressWarnings("unchecked") - Iterator<Object> tmpIt = ((Collection<Object>) o).iterator(); - while (tmpIt.hasNext()) { - Object s = tmpIt.next(); - if (!s.getClass().equals(typeList.get(index++))) { - throw new RuntimeException("elements of input list should have the same type"); - } - } - - if (index != typeList.size()) { - throw new RuntimeException("elements of input list should have the same size"); - } - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java deleted file mode 100644 index 80e89a5..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossOperator.java +++ /dev/null @@ -1,241 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.operators; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.flink.api.common.operators.Operator; -import org.apache.flink.api.common.operators.RecordOperator; -import org.apache.flink.api.common.operators.base.CrossOperatorBase; -import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; -import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; -import org.apache.flink.api.common.operators.util.UserCodeWrapper; -import org.apache.flink.api.java.record.functions.CrossFunction; -import org.apache.flink.api.java.record.functions.FunctionAnnotation; -import org.apache.flink.types.Key; -import org.apache.flink.types.Record; - -import com.google.common.base.Preconditions; - - -/** - * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * - * CrossOperator that applies a {@link CrossFunction} to each element of the Cartesian Product. - * - * @see CrossFunction - */ -@Deprecated -public class CrossOperator extends CrossOperatorBase<Record, Record, Record, CrossFunction> implements RecordOperator { - - /** - * Creates a Builder with the provided {@link CrossFunction} implementation. - * - * @param udf The {@link CrossFunction} implementation for this Cross operator. - */ - public static Builder builder(CrossFunction udf) { - return new Builder(new UserCodeObjectWrapper<CrossFunction>(udf)); - } - - /** - * Creates a Builder with the provided {@link CrossFunction} implementation. - * - * @param udf The {@link CrossFunction} implementation for this Cross operator. - */ - public static Builder builder(Class<? extends CrossFunction> udf) { - return new Builder(new UserCodeClassWrapper<CrossFunction>(udf)); - } - - /** - * The private constructor that only gets eIinvoked from the Builder. - * @param builder - */ - protected CrossOperator(Builder builder) { - super(builder.udf, OperatorInfoHelper.binary(), builder.name); - - if (builder.inputs1 != null && !builder.inputs1.isEmpty()) { - setFirstInput(Operator.createUnionCascade(builder.inputs1)); - } - if (builder.inputs2 != null && !builder.inputs2.isEmpty()) { - setSecondInput(Operator.createUnionCascade(builder.inputs2)); - } - - setBroadcastVariables(builder.broadcastInputs); - setSemanticProperties(FunctionAnnotation.readDualConstantAnnotations(builder.udf)); - } - - - @Override - public Class<? extends Key<?>>[] getKeyClasses() { - return emptyClassArray(); - } - - // -------------------------------------------------------------------------------------------- - - /** - * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition). - */ - public static class Builder { - - /* The required parameters */ - private final UserCodeWrapper<CrossFunction> udf; - - /* The optional parameters */ - private List<Operator<Record>> inputs1; - private List<Operator<Record>> inputs2; - private Map<String, Operator<Record>> broadcastInputs; - private String name; - - /** - * Creates a Builder with the provided {@link CrossFunction} implementation. - * - * @param udf The {@link CrossFunction} implementation for this Cross operator. - */ - protected Builder(UserCodeWrapper<CrossFunction> udf) { - this.udf = udf; - this.inputs1 = new ArrayList<Operator<Record>>(); - this.inputs2 = new ArrayList<Operator<Record>>(); - this.broadcastInputs = new HashMap<String, Operator<Record>>(); - } - - /** - * Sets the first input. - * - * @param input The first input. - */ - public Builder input1(Operator<Record> input) { - Preconditions.checkNotNull(input, "The input must not be null"); - - this.inputs1.clear(); - this.inputs1.add(input); - return this; - } - - /** - * Sets the second input. - * - * @param input The second input. - */ - public Builder input2(Operator<Record> input) { - Preconditions.checkNotNull(input, "The input must not be null"); - - this.inputs2.clear(); - this.inputs2.add(input); - return this; - } - - /** - * Sets one or several inputs (union) for input 1. - * - * @param inputs - */ - public Builder input1(Operator<Record>...inputs) { - this.inputs1.clear(); - for (Operator<Record> c : inputs) { - this.inputs1.add(c); - } - return this; - } - - /** - * Sets one or several inputs (union) for input 2. - * - * @param inputs - */ - public Builder input2(Operator<Record>...inputs) { - this.inputs2.clear(); - for (Operator<Record> c : inputs) { - this.inputs2.add(c); - } - return this; - } - - /** - * Sets the first inputs. - * - * @param inputs - */ - public Builder inputs1(List<Operator<Record>> inputs) { - this.inputs1 = inputs; - return this; - } - - /** - * Sets the second inputs. - * - * @param inputs - */ - public Builder inputs2(List<Operator<Record>> inputs) { - this.inputs2 = inputs; - return this; - } - - /** - * Binds the result produced by a plan rooted at {@code root} to a - * variable used by the UDF wrapped in this operator. - */ - public Builder setBroadcastVariable(String name, Operator<Record> input) { - this.broadcastInputs.put(name, input); - return this; - } - - /** - * Binds multiple broadcast variables. - */ - public Builder setBroadcastVariables(Map<String, Operator<Record>> inputs) { - this.broadcastInputs.clear(); - this.broadcastInputs.putAll(inputs); - return this; - } - - /** - * Sets the name of this operator. - * - * @param name - */ - public Builder name(String name) { - this.name = name; - return this; - } - - /** - * Creates and returns a CrossOperator from using the values given - * to the builder. - * - * @return The created operator - */ - public CrossOperator build() { - setNameIfUnset(); - return new CrossOperator(this); - } - - protected void setNameIfUnset() { - if (name == null) { - name = udf.getUserCodeClass().getName(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java deleted file mode 100644 index f4a2ad1..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithLargeOperator.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.operators; - -import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossWithLarge; -import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; -import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; -import org.apache.flink.api.common.operators.util.UserCodeWrapper; -import org.apache.flink.api.java.record.functions.CrossFunction; - - -/** - * - * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * This operator represents a Cartesian-Product operation. Of the two inputs, the first is expected to be large - * and the second is expected to be small. - * - * @see CrossFunction - */ - -@Deprecated -public class CrossWithLargeOperator extends CrossOperator implements CrossWithLarge { - - /** - * Creates a Builder with the provided {@link CrossFunction} implementation. - * - * @param udf The {@link CrossFunction} implementation for this Cross operator. - */ - public static Builder builder(CrossFunction udf) { - return new Builder(new UserCodeObjectWrapper<CrossFunction>(udf)); - } - - /** - * Creates a Builder with the provided {@link CrossFunction} implementation. - * - * @param udf The {@link CrossFunction} implementation for this Cross operator. - */ - public static Builder builder(Class<? extends CrossFunction> udf) { - return new Builder(new UserCodeClassWrapper<CrossFunction>(udf)); - } - - /** - * The private constructor that only gets invoked from the Builder. - * @param builder - */ - protected CrossWithLargeOperator(Builder builder) { - super(builder); - } - - // -------------------------------------------------------------------------------------------- - - /** - * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition). - */ - public static class Builder extends CrossOperator.Builder { - - /** - * Creates a Builder with the provided {@link CrossFunction} implementation. - * - * @param udf The {@link CrossFunction} implementation for this Cross operator. - */ - private Builder(UserCodeWrapper<CrossFunction> udf) { - super(udf); - } - - /** - * Creates and returns a CrossOperator from using the values given - * to the builder. - * - * @return The created operator - */ - @Override - public CrossWithLargeOperator build() { - setNameIfUnset(); - return new CrossWithLargeOperator(this); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/c787a037/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java b/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java deleted file mode 100644 index 9c9b758..0000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/record/operators/CrossWithSmallOperator.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.record.operators; - -import org.apache.flink.api.common.operators.base.CrossOperatorBase.CrossWithSmall; -import org.apache.flink.api.common.operators.util.UserCodeClassWrapper; -import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper; -import org.apache.flink.api.common.operators.util.UserCodeWrapper; -import org.apache.flink.api.java.record.functions.CrossFunction; - - -/** - * - * <b>NOTE: The Record API is marked as deprecated. It is not being developed anymore and will be removed from - * the code at some point. - * See <a href="https://issues.apache.org/jira/browse/FLINK-1106">FLINK-1106</a> for more details.</b> - * - * - * This operator represents a Cartesian-Product operation. Of the two inputs, the first is expected to be large - * and the second is expected to be small. - * - * @see CrossFunction - */ -@Deprecated -public class CrossWithSmallOperator extends CrossOperator implements CrossWithSmall { - - /** - * Creates a Builder with the provided {@link CrossFunction} implementation. - * - * @param udf The {@link CrossFunction} implementation for this Cross operator. - */ - public static Builder builder(CrossFunction udf) { - return new Builder(new UserCodeObjectWrapper<CrossFunction>(udf)); - } - - /** - * Creates a Builder with the provided {@link CrossFunction} implementation. - * - * @param udf The {@link CrossFunction} implementation for this Cross operator. - */ - public static Builder builder(Class<? extends CrossFunction> udf) { - return new Builder(new UserCodeClassWrapper<CrossFunction>(udf)); - } - - /** - * The private constructor that only gets invoked from the Builder. - * @param builder - */ - protected CrossWithSmallOperator(Builder builder) { - super(builder); - } - - // -------------------------------------------------------------------------------------------- - - /** - * Builder pattern, straight from Joshua Bloch's Effective Java (2nd Edition). - */ - public static class Builder extends CrossOperator.Builder { - - /** - * Creates a Builder with the provided {@link CrossFunction} implementation. - * - * @param udf The {@link CrossFunction} implementation for this Cross operator. - */ - private Builder(UserCodeWrapper<CrossFunction> udf) { - super(udf); - } - - /** - * Creates and returns a CrossOperator from using the values given - * to the builder. - * - * @return The created operator - */ - @Override - public CrossWithSmallOperator build() { - setNameIfUnset(); - return new CrossWithSmallOperator(this); - } - } -}