Hello,

using the HdfsWriter provided by samza package samza-hdfs we try to write
events to HDFS.
After the newest patches in version 0.10.1 the Bug regarding closing files
was fixed. But the Bucketer does not seem to work with any of the given
HdfsWriter implementations.
Every new event sent to the hdfs outputstream system is creating a new file
on HDFS. According to
documentation "systems.hdfsstream.producer.hdfs.write.batch.size.bytes=67108864"
should have taken care of appending events to an existing file until the
bytes threshold is reached.
Is this a known bug or have I missed sth in my implementation.

Code Snippet:


> # HDFS System
>
> systems.hdfsstream.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory
>
> systems.hdfsstream.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.BinarySequenceFileHdfsWriter
> systems.hdfsstream.producer.hdfs.write.batch.size.bytes=67108864
> systems.hdfsstream.samza.msg.serde=json
>
> # The base dir for HDFS output. The default Bucketer for SequenceFile
> HdfsWriters
>
> systems.hdfsstream.producer.hdfs.base.output.dir=/user/hive/warehouse/foobar
> # Bucket into following
>
> systems.hdfsstream.producer.hdfs.bucketer.class=org.apache.samza.system.hdfs.writer.JobNameDateTimeBucketer
> systems.hdfsstream.producer.hdfs.bucketer.date.path.format=yyyy
>
>


> private final SystemStream outputStream;
> outputStream = new SystemStream("hdfsstream", "foobarTask");
> >
>
>

@Override
> public void process(IncomingMessageEnvelope envelope, MessageCollector
> messageCollector, TaskCoordinator taskCoordinator) throws
> ClassNotFoundException, SQLException
> {
>     final Timer.Context context = registry.timer("foobar").time();
>     try
>     {
>         String incoming = (String) envelope.getMessage();
>         GsonBuilder gsonBuilder = new
> GsonBuilder().registerTypeAdapter(DateTime.class, new DateTimeConverter());
>         Gson gson = gsonBuilder.create();
>         SomeClass message = gson.fromJson(incoming, SomeClass.class);
>
>         try{
>             messageCollector.send(new
> OutgoingMessageEnvelope(outputStream, gson.toJson(message)));
>             registry.counter("foobar").inc();
>         }
>         catch (Exception e)
>         {
>             LOGGER.error("error with message: ", e);
>             registry.counter("failedProcessCounter").inc();
>         }
>     }
>     finally
>     {
>         context.stop();
>     }
> }



Kind regards
Thees Gieselmann

-- 
Board of Directors: Jan-Niclaus Mewes, Claas Heiland
Commercial Register: HRB 110377

The information and attached file(s) (if any) contained in this email is 
confidential and may be legally privileged. It is intended solely for the 
addressee. Any access to this email by persons other than the addressee is 
prohibited. If you are not the addressee for whom this email is intended, 
you may not disclose, copy, distribute or store this email. If you receive 
this in error, please delete and email confirmation to the sender. Thank 
you for your cooperation.

Reply via email to