Hi Yassine,

I ran your code without problems and got the correct result.
Can you provide the Stacktrace of the Exception?

Thanks, Fabian

2016-10-10 10:57 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>:

> Thank you Fabian and Stephan for the suggestions.
> I couldn't override "readLine()" because it's final, so went with Fabian's
> solution, but I'm struggling with csv field masks. Any help is appreciated.
> I created an Input Format which is basically TupleCsvInputFormat for which
> I overrode the nextRecord() method to catch the exceptions. But I'm having
> a *java.lang.ArrayIndexOutOfBoundsException*  when I add a
> boolean[]{true, false, true} field mask . If I add a int[]{1,0,1} field
> mask, the job succeeds but outputs the first and second columns. Here is my
> code:
>
> TupleTypeInfo<Tuple2<String, String>> typeInfo = TupleTypeInfo.
> getBasicTupleTypeInfo(String.class, String.class);
> Path histPath = new Path("hdfs:///shared/file.csv");
>
> CsvInputFormat <Tuple2<String, String>> myInputFormt = new
> MyCsvInputFormat<>(histPath, typeInfo, new boolean[]{true, false, true});
> myInputFormt.enableQuotedStringParsing('"');
> myInputFormt.setSkipFirstLineAsHeader(true);
> myInputFormt.setLenient(true);
>
> DataSet<Tuple2<String, String>> test = env.createInput(myInputFormt,
> typeInfo).withParameters(parameters);
> test.writeAsText("E:\\data\\test.csv", FileSystem.WriteMode.OVERWRITE);
>
> and here is the  custom input format:
>
> public class MyCsvInputFormat<OUT> extends CsvInputFormat<OUT> {
>     private static final long serialVersionUID = 1L;
>     private TupleSerializerBase<OUT> tupleSerializer;
>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
> tupleTypeInfo) {
>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
> tupleTypeInfo);
>     }
>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo) {
>         this(filePath, lineDelimiter, fieldDelimiter, tupleTypeInfo,
> createDefaultMask(tupleTypeInfo.getArity()));
>     }
>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
> tupleTypeInfo, int[] includedFieldsMask) {
>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
> tupleTypeInfo, includedFieldsMask);
>     }
>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, int[]
> includedFieldsMask) {
>         super(filePath);
>         boolean[] mask = (includedFieldsMask == null)
>                 ? createDefaultMask(tupleTypeInfo.getArity())
>                 : toBooleanMask(includedFieldsMask);
>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo, mask);
>     }
>     public MyCsvInputFormat(Path filePath, TupleTypeInfoBase<OUT>
> tupleTypeInfo, boolean[] includedFieldsMask) {
>         this(filePath, DEFAULT_LINE_DELIMITER, DEFAULT_FIELD_DELIMITER,
> tupleTypeInfo, includedFieldsMask);
>     }
>     public MyCsvInputFormat(Path filePath, String lineDelimiter, String
> fieldDelimiter, TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[]
> includedFieldsMask) {
>         super(filePath);
>         configure(lineDelimiter, fieldDelimiter, tupleTypeInfo,
> includedFieldsMask);
>     }
>     private void configure(String lineDelimiter, String fieldDelimiter,
>                            TupleTypeInfoBase<OUT> tupleTypeInfo, boolean[]
> includedFieldsMask) {
>         if (tupleTypeInfo.getArity() == 0) {
>             throw new IllegalArgumentException("Tuple size must be
> greater than 0.");
>         }
>         if (includedFieldsMask == null) {
>             includedFieldsMask = createDefaultMask(
> tupleTypeInfo.getArity());
>         }
>         tupleSerializer = (TupleSerializerBase<OUT>) 
> tupleTypeInfo.createSerializer(new
> ExecutionConfig());
>         setDelimiter(lineDelimiter);
>         setFieldDelimiter(fieldDelimiter);
>         Class<?>[] classes = new Class<?>[tupleTypeInfo.getArity()];
>         for (int i = 0; i < tupleTypeInfo.getArity(); i++) {
>             classes[i] = tupleTypeInfo.getTypeAt(i).getTypeClass();
>         }
>         setFieldsGeneric(includedFieldsMask, classes);
>     }
>     @Override
>     public OUT fillRecord(OUT reuse, Object[] parsedValues) {
>         return tupleSerializer.createOrReuseInstance(parsedValues, reuse);
>     }
>
>     @Override
>     public OUT nextRecord(OUT record) {
>         OUT returnRecord = null;
>         do {
>             try {
>                 returnRecord = super.nextRecord(record);
>             } catch (IOException e) {
>                 e.printStackTrace();
>             }
>         } while (returnRecord == null && !reachedEnd());
>         return returnRecord;
>     }
> }
>
> Thanks,
> Yassine
>
>
>
>
>
> 2016-10-04 18:35 GMT+02:00 Stephan Ewen <se...@apache.org>:
>
>> How about just overriding the "readLine()" method to call
>> "super.readLine()" and catching EOF exceptions?
>>
>> On Tue, Oct 4, 2016 at 5:56 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Yassine,
>>>
>>> AFAIK, there is no built-in way to ignore corrupted compressed files.
>>> You could try to implement a FileInputFormat that wraps the
>>> CsvInputFormat and forwards all calls to the wrapped CsvIF.
>>> The wrapper would also catch and ignore the EOFException.
>>>
>>> If you do that, you would not be able to use the env.readCsvFile()
>>> shortcut but would need to create an instance of your own InputFormat and
>>> add it with
>>> env.readFile(yourIF).
>>>
>>> Hope this helps,
>>> Fabian
>>>
>>> 2016-10-04 17:43 GMT+02:00 Yassine MARZOUGUI <y.marzou...@mindlytix.com>
>>> :
>>>
>>>> Hi all,
>>>>
>>>> I am reading a large number of GZip compressed csv files, nested in a
>>>> HDFS directory:
>>>>
>>>> Configuration parameters = new Configuration();
>>>> parameters.setBoolean("recursive.file.enumeration", true);
>>>> DataSet<Tuple2<String, Long>> hist = env.readCsvFile("hdfs:///share
>>>> d/logs/")
>>>>                 .ignoreFirstLine()
>>>>                 .fieldDelimiter("|")
>>>>                 .includeFields("011000")
>>>>                 .types(String.class, Long.class)
>>>>                 .withParameters(parameters);
>>>>
>>>> My job is failing with the following exception:
>>>>
>>>> 2016-10-04 17:19:59,933 INFO  
>>>> org.apache.flink.runtime.jobmanager.JobManager                - Status of 
>>>> job 66fbbbbce11277a4df6aa48dee636a81 (HDFSTest) changed to FAILING.
>>>>
>>>> java.io.EOFException: Unexpected end of ZLIB input stream
>>>>
>>>>    at java.util.zip.InflaterInputStream.fill(Unknown Source)
>>>>
>>>>    at java.util.zip.InflaterInputStream.read(Unknown Source)
>>>>
>>>>    at java.util.zip.GZIPInputStream.read(Unknown Source)
>>>>
>>>>    at 
>>>> org.apache.flink.api.common.io.InputStreamFSInputWrapper.read(InputStreamFSInputWrapper.java:75)
>>>>
>>>>    at 
>>>> org.apache.flink.api.common.io.DelimitedInputFormat.fillBuffer(DelimitedInputFormat.java:591)
>>>>
>>>>    at 
>>>> org.apache.flink.api.common.io.DelimitedInputFormat.readLine(DelimitedInputFormat.java:513)
>>>>
>>>>    at 
>>>> org.apache.flink.api.common.io.DelimitedInputFormat.nextRecord(DelimitedInputFormat.java:479)
>>>>
>>>>    at 
>>>> org.apache.flink.api.java.io.CsvInputFormat.nextRecord(CsvInputFormat.java:78)
>>>>
>>>>    at 
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:162)
>>>>
>>>>    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:608)
>>>>
>>>>    at java.lang.Thread.run(Unknown Source)
>>>>
>>>> I think it is due to some unproperly compressed files, how can I handle 
>>>> and ignore such exceptions? Thanks.
>>>>
>>>>
>>>> Best,
>>>> Yassine
>>>>
>>>>
>>>
>>
>

Reply via email to