I think you are confusing the block sizes of the local FS, HDFS, and the BinaryIF/OF. The block size of the local FS is not related. In fact, 4kb is much too small. Each block will result in one split. 4kb block size will create thousands of splits causing a lot of scheduling overhead.
The HDFS block size should be a multiple of the BinaryIF/OF block size to avoid data access over the network. I would set the default block size to 32 or 64MB given that multiples of 64MB are common values for HDFS block sizes. I reached this solution, do you think it could be ok (taking into account that my local fs block size is 4096): blockSize = new org.apache.hadoop.conf.Configuration().getInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); In this way, if I find the hadoop config files in the resources folder I use that blockSize, otherwise 4096. In this way, also if I run the job locally, I'll have consistent setting whether the hadoop config files are there or not.. Now I have another problem..the byte[] of the Tuple2 is written using thrift 0.9.2 but the one on the Flink dist is 0.6.1 and I think this is the cuase of some errors like java.lang.AbstractMethodError: org.apache.thrift.TUnion.readValue(Lorg/apache/thrift/protocol/TProtocol;Lorg/apache/thrift/protocol/TField;)Ljava/lang/Object; at org.apache.thrift.TUnion.read(TUnion.java:135) at org.apache.jena.riot.thrift.BinRDF.apply(BinRDF.java:187) at org.apache.jena.riot.thrift.BinRDF.applyVisitor(BinRDF.java:176) at org.apache.jena.riot.thrift.BinRDF.protocolToStream(BinRDF.java:164) at org.apache.jena.riot.thrift.BinRDF.inputStreamToStream(BinRDF.java:149) What is the best way to fix such version mismatching problems? On Fri, May 8, 2015 at 6:14 PM, Fabian Hueske <[email protected]> wrote: > The point is that you don't want Flink to automatically infer the > parameter because the default parameter depends on the file system. > If you write a file to local FS this happens with a different default > parameter than if you would write to HDFS. > Just set the parameter to 64 MB when reading and writing to the same value. > > 2015-05-08 18:07 GMT+02:00 Flavio Pompermaier <[email protected]>: > >> If I haven't set that param in the code of the job do you think Flink >> automatically infer that param from somewher in the hadoop xxx-site.xml >> files or from the hadoop cluster? >> >> On Fri, May 8, 2015 at 6:02 PM, Fabian Hueske <[email protected]> wrote: >> >>> The value of the parameter is not important for correctness but it must >>> be the same when writing and reading. >>> Try setting it to 64 MB. >>> >>> >>> 2015-05-08 17:52 GMT+02:00 Flavio Pompermaier <[email protected]>: >>> >>>> How can I retrieve the right one..?I I write with a block size >>>> different from the one of HDFS can I still read it then..? >>>> >>>> On Fri, May 8, 2015 at 5:40 PM, Fabian Hueske <[email protected]> >>>> wrote: >>>> >>>>> Have you tried to explicitly set the blocksize parameter when writing >>>>> and reading? >>>>> The default value might be different when reading from local FS and >>>>> HDFS. >>>>> >>>>> 2015-05-08 17:34 GMT+02:00 Flavio Pompermaier <[email protected]>: >>>>> >>>>>> Hi to all, >>>>>> >>>>>> I've created a dataset of Tuple2<String,byte[]> and I saved it on my >>>>>> local fs (a folder with 8 files because I run the program with >>>>>> parallelism >>>>>> 8) with the following code: >>>>>> >>>>>> Configuration configuration = new Configuration(); >>>>>> TypeSerializerOutputFormat<Tuple2<String, byte[]>> outputFormat = new >>>>>> TypeSerializerOutputFormat<>(); >>>>>> outputFormat.setOutputFilePath(new Path(targetDir)); >>>>>> outputFormat.setWriteMode(WriteMode.OVERWRITE); >>>>>> outputFormat.configure(configuration); >>>>>> ds.output(outputFormat); >>>>>> >>>>>> Then, if I read such a folder from the local fs everything is fine, >>>>>> otherwise if I read it from HDFS I get the following exception: >>>>>> >>>>>> java.io.EOFException >>>>>> at java.io.DataInputStream.readFully(DataInputStream.java:197) >>>>>> at java.io.DataInputStream.readFully(DataInputStream.java:169) >>>>>> at >>>>>> org.apache.flink.core.memory.InputViewDataInputStreamWrapper.readFully(InputViewDataInputStreamWrapper.java:62) >>>>>> at >>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:81) >>>>>> at >>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:87) >>>>>> at >>>>>> org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:30) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:136) >>>>>> at >>>>>> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30) >>>>>> at >>>>>> org.apache.flink.api.java.io.TypeSerializerInputFormat.deserialize(TypeSerializerInputFormat.java:50) >>>>>> at >>>>>> org.apache.flink.api.common.io.BinaryInputFormat.nextRecord(BinaryInputFormat.java:274) >>>>>> at >>>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:173) >>>>>> at >>>>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:217) >>>>>> >>>>>> Could you help in understanding what's going on? >>>>>> >>>>>> The code I use to read the serialized ds is: >>>>>> >>>>>> TupleTypeInfo<Tuple2<String, byte[]>> tInfo = new >>>>>> TupleTypeInfo<Tuple2<String, byte[]>>( >>>>>> BasicTypeInfo.STRING_TYPE_INFO, >>>>>> PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO); >>>>>> TypeSerializerInputFormat<Tuple2<String, byte[]>> inputFormat = new >>>>>> TypeSerializerInputFormat<>(tInfo); >>>>>> inputFormat.setFilePath(new Path(inputDir)); >>>>>> inputFormat.configure(conf); >>>>>> DataSet<Tuple6<String, String, String, String, String, String>> ret = >>>>>> env.createInput(inputFormat).flatMap(XXX); >>>>>> >>>>>> >>>>>> Best, >>>>>> Flavio >>>>>> >>>>> >>>>> >>>> >>>> >>>> >>> >> >
