If I haven't set that param in the code of the job do you think Flink automatically infer that param from somewher in the hadoop xxx-site.xml files or from the hadoop cluster?
On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[email protected]> wrote: > The value of the parameter is not important for correctness but it must be > the same when writing and reading. > Try setting it to 64 MB. > > > 2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[email protected]>: > >> How can I retrieve the right one..?I I write with a block size different >> from the one of HDFS can I still read it then..? >> >> On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[email protected]> wrote: >> >>> Have you tried to explicitly set the blocksize parameter when writing >>> and reading? >>> The default value might be different when reading from local FS and HDFS. >>> >>> 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[email protected]>: >>> >>>> Hi to all, >>>> >>>> I've created a dataset of Tuple2<String,byte[]> and I saved it on my >>>> local fs (a folder with 8 files because I run the program with parallelism >>>> 8) with the following code: >>>> >>>> Configuration configuration = new Configuration(); >>>> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new >>>> TypeSerializerOutputFormat<>(); >>>> outputFormat.setOutputFilePath(new Path(targetDir)); >>>> outputFormat.setWriteMode(WriteMode.OVERWRITE); >>>> outputFormat.configure(configuration); >>>> ds.output(outputFormat); >>>> >>>> Then, if I read such a folder from the local fs everything is fine, >>>> otherwise if I read it from HDFS I get the following exception: >>>> >>>> java.io.EOFException >>>> at java.io.DataInputStream.readFully(DataInputStream.java:197) >>>> at java.io.DataInputStream.readFully(DataInputStream.java:169) >>>> at >>>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62) >>>> at >>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) >>>> at >>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87) >>>> at >>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >>>> at >>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>> at >>>> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50) >>>> at >>>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274) >>>> at >>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) >>>> at >>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) >>>> >>>> Could you help in understanding what's going on? >>>> >>>> The code I use to read the serialized ds is: >>>> >>>> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new >>>> TupleTypeInfo<Tuple2<String, byte[]>>( >>>> BasicTypeInfo.STRING_TYPE_INFO, >>>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); >>>> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new >>>> TypeSerializerInputFormat<>(tInfo); >>>> inputFormat.setFilePath(new Path(inputDir)); >>>> inputFormat.configure(conf); >>>> DataSet<Tuple6<String, String, String, String, String, String>> ret = >>>> env.createInput(inputFormat).flatMap(XXX); >>>> >>>> >>>> Best, >>>> Flavio >>>> >>> >>> >> >> >> >
