> On July 30, 2015, 6:59 p.m., Yan Fang wrote: > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala, > > line 40 > > <https://reviews.apache.org/r/35445/diff/4/?file=1023371#file1023371line40> > > > > My overall concern here is that, if there are more than one tasks are > > running, is it possible that all the tasks are writing to one file at the > > same time? > > Eli Reisman wrote: > I don't think so, each registered source should be using it's own > HdfsWriter in write() calls even on the same Producer and the filenames per > writer are unique-ified in the writer impl. There are other ways to > accomplish that uniqueness though.
I see. We are using the UUID.randomUUID to make sure the writers writes to different files. This is fine unless we win the lottery. :) > On July 30, 2015, 6:59 p.m., Yan Fang wrote: > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala, > > line 37 > > <https://reviews.apache.org/r/35445/diff/4/?file=1023373#file1023373line37> > > > > I would prefer the param idea because 1) Samza is already using this > > fashion 2) less code especially when there are more SequenceFileHdfsWriter > > come out (LongWritable, etc) > > > > "like the casting of the outgoing message to something not-Writable > > like Array[Byte] or String might require a third param and it might start > > to get awkward" > > > > -- We can always cast the outgoing msg to Array[Byte] using the serde > > defined for this msg. So as long as the Wriable accepts Array[Byte], this > > should be fine. > > > > "Also there are some Writable types that would not allow us to > > determine message size for batching purposes the way " > > > > -- I think we can either give it a default size (this can be > > configurable) when there is not getLength method or use a subclass. Either > > way will be fine. > > Eli Reisman wrote: > I definitely agree on the less code point, and I think we can move > functions like the compression selection to the base class. > > But, I don't think we can't just cast to Array[Byte] for all the Writable > types to accept the message, even from the serde. Only Text and BytesWritable > will accept Array[Byte] messages, so we will be limited to just those two > types forever if we are only using that cast on the outgoing message before > wrapping it in the Writable. If that works (i.e. messages will never be > FloatWritable, LongWritable etc.) then generics will work there. > > But the getLength issue still presents a problem. We already have a > configuration to set a batch size default or user-defined one, but getLength > is called per-message-write, and it's how we track how big the current file > is. We won't know when to split or when we hit that configured size without > tracking it. Each Writable will need slightly different logic to pick up or > estimate message size, they don't all supply a getLength call for byte size. > > So again that seems to force us to only work with BytesWritable and Text > value types? If I'm completely missing something here please let me know and > we can make the desire changes. Thanks for the input! I see. Considering those facts, IMO, moving the common code, such as getCompression, getBukets, to their parent class is sufficient. BTW, def getBucketer(systemName: String, config: HdfsConfig) = { new JobNameDateTimeBucketer(systemName, config) } Should it be Bucketer.getInstance(systemName, config) ? If we move it to the parent class, then just "bucketer", I think. - Yan ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35445/#review93614 ----------------------------------------------------------- On July 28, 2015, 5:25 a.m., Eli Reisman wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/35445/ > ----------------------------------------------------------- > > (Updated July 28, 2015, 5:25 a.m.) > > > Review request for samza. > > > Repository: samza > > > Description > ------- > > SAMZA-693: Very basic HDFS Producer service for Samza > > > Diffs > ----- > > build.gradle 0852adc > docs/learn/documentation/versioned/hdfs/producer.md PRE-CREATION > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala > PRE-CREATION > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala > PRE-CREATION > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala > PRE-CREATION > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducer.scala > PRE-CREATION > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemProducerMetrics.scala > PRE-CREATION > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/BinarySequenceFileHdfsWriter.scala > PRE-CREATION > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/Bucketer.scala > PRE-CREATION > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/HdfsWriter.scala > PRE-CREATION > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/JobNameDateTimeBucketer.scala > PRE-CREATION > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/SequenceFileHdfsWriter.scala > PRE-CREATION > > samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/writer/TextSequenceFileHdfsWriter.scala > PRE-CREATION > samza-hdfs/src/test/resources/samza-hdfs-test-batch-job-text.properties > PRE-CREATION > samza-hdfs/src/test/resources/samza-hdfs-test-batch-job.properties > PRE-CREATION > samza-hdfs/src/test/resources/samza-hdfs-test-job-text.properties > PRE-CREATION > samza-hdfs/src/test/resources/samza-hdfs-test-job.properties PRE-CREATION > > samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala > PRE-CREATION > settings.gradle 19bff97 > > Diff: https://reviews.apache.org/r/35445/diff/ > > > Testing > ------- > > Updated: See JIRA SAMZA-693 for details, this latest update (693-4) addresses > post-review issues and adds more pluggable design, several default writer > implementations, and more (and more thorough) unit tests. > > Passes 'gradle clean test'. > > > Thanks, > > Eli Reisman > >