> On Sept. 13, 2016, 12:33 a.m., Yi Pan (Data Infrastructure) wrote:
> > Still in the middle but don't want to lose what I had up to now.

Also in the middle of addressing all the feedbacks. Have all the changes 
locally. Will push them altogether later. Thanks again for your review!


> On Sept. 13, 2016, 12:33 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java, 
> > line 47
> > <https://reviews.apache.org/r/51142/diff/5/?file=1493800#file1493800line47>
> >
> >     nit: when you refer to the class names, it would be better to use 
> > {@link HdfsSystemAdmin} {@link HdfsSystemConsumer} etc.

Will do.


> On Sept. 13, 2016, 12:33 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java, 
> > line 91
> > <https://reviews.apache.org/r/51142/diff/5/?file=1493800#file1493800line91>
> >
> >     You can do:
> >     try(FSDataOutputStream fos = fs.create(targetPath)) {
> >       fos.write(PartitionDescriptionUtil.toJson();
> >       }

Do you just intend to narrow down the try block? the "getFileSystem" method 
above will also throw IOExecption, so I have to include everything here.


> On Sept. 13, 2016, 12:33 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java, 
> > line 105
> > <https://reviews.apache.org/r/51142/diff/5/?file=1493800#file1493800line105>
> >
> >     What if the PartitionDescriptor already exists? Could it be the case 
> > that the systemStreamMetadata maintains a different copy of 
> > PartitionDescriptor? It is not clear to me which one is the source of 
> > truth? directoryPartitioner.getPartitionMetadataMap()? Or 
> > directoryPartitioner.getPartitionDescriptor()? Maybe I miss some basic 
> > information regarding to the concept on PartitionDescriptor vs 
> > PartitionMetadataMap?

You are right. This is an unsolved problem given that we assume the HDFS folder 
is immutable. So now, what if the HDFS folder really is altered. Before we 
support the mutable HFDS input,  I think we have two options: 1. Throw an 
runtime exception to stop the job if we see inconsistency 2. Ignore the newer 
folder info we got and always treat the partition descriptors that we persisted 
on HDFS as the source of truth. Use the source of truth to proceed with the job 
and don't kill it or throw exception.
Do you have a preference?


> On Sept. 13, 2016, 12:33 a.m., Yi Pan (Data Infrastructure) wrote:
> > samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java,
> >  line 69
> > <https://reviews.apache.org/r/51142/diff/5/?file=1493801#file1493801line69>
> >
> >     I would prefer to follow the same pattern as KafkaSystemConsumer, i.e. 
> > passing in the HdfsSystemConsumerMetrics object instead of the 
> > MetricsRegistry. Please check the code in KafkaSystemFactory.getConsumer() 
> > to see how the metrics object are created and passed along.

I can't avoid directly using MetricsRegistry. At the very least, I have to pass 
this to the base class: BlockingEnvelopeMap (unless you want to change all the 
interface in BlockingEnvelopMap as well. Event if we wan t to do that. It seems 
beyond the scope of this RB. I can maybe to a separte fix for that.). But I 
will create a HdfsSystemConsumerMetrics.


- Hai


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/51142/#review148612
-----------------------------------------------------------


On Sept. 9, 2016, 1:34 a.m., Hai Lu wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/51142/
> -----------------------------------------------------------
> 
> (Updated Sept. 9, 2016, 1:34 a.m.)
> 
> 
> Review request for samza, Chris Pettitt, Yi Pan (Data Infrastructure), and 
> Navina Ramesh.
> 
> 
> Bugs: SAMZA-967
>     https://issues.apache.org/jira/browse/SAMZA-967
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> Add HDFS System Consumer: 
> 
> 1. System admin, partitioner
> 2. System consumer with metrics
> 
> Design doc can be found here: 
> https://issues.apache.org/jira/secure/attachment/12824078/HDFSSystemConsumer.pdf
> 
> An overview of the high level architecture: 
> 
> The system factory is used by Samza to instantiate SystemConsumer, 
> SystemProducer, and SystemAdmin for a specific system. The 
> FileDataSystemFactory can be reused for other file system like sources. 
> 
> HDFSSystemAdmin will start a “DirectoryPartitioner” to figure out the set of 
> HDFS files need to be consumed for this job. The DirectoryPartitioner also 
> uses “GroupingPattern” to group files into partitions if advanced 
> partitioning is required. HDFSSystemAdmin will then persist the 
> “PartitionDescriptor” to HDFS.
> 
> The HDFSSystemConsumer will then pick up the “PartitionDescriptor” from HDFS. 
> Based on this information as well as the actual assignment of partitions, it 
> would then know which files to read from.
> 
> The initial implementation of the HDFS system consumer supports only avro 
> data files. It’s very easy to extend it to a variety of file format by 
> implementing the FileReader interface.
> 
>                                                                               
>                                         
>                              
> +------------------------------------------------------------------------------+
>          
>                              |                                                
>                               |         
>            +-----------------+                                     HDFS       
>                               |         
>            |   Obtain        |                                                
>                               |         
>            |  Partition      
> +------+----------------------^------+---------------------------------^-------+
>          
>            | Description            |                      |      |           
>                       |                 
>            |                        |                      |      |           
>                       |                 
>            |          +-------------v-------+              |      |       
> Filtering/                |                 
>            |          |                     |              |      +---+    
> Grouping                 +-----+           
>            |          | HDFSAvroFileReader  |              |          |       
>                             |           
>            |          |                     |    Persist   |          |       
>                             |           
>            |          +---------+-----------+   Partition  |          |       
>                             |           
>            |                    |              Description |   
> +------v--------------+         +----------+----------+
>            |                    |                          |   |              
>        |         |                     |
>            |          +---------+-----------+              |   |Directory 
> Partitioner|         |   HDFSAvroWriter    |
>            |          |     IFileReader     |              |   |              
>        |         |                     |
>            |          |                     |              |   
> +------+--------------+         +----------+----------+
>            |          +---------+-----------+              |          |       
>                             |           
>            |                    |                          |          |       
>                             |           
>            |                    |                          |          |       
>                             |           
>            |          +---------+-----------+            
> +-+----------+--------+               +----------+----------+
>            |          |                     |            |                    
>  |               |                     |
>            |          | HDFSSystemConsumer  |            |   HDFSSystemAdmin  
>  |               | HDFSSystemProducer  |
>            +---------->                     |            |                    
>  |               |                     |
>                       +---------+-----------+            
> +-----------+---------+               +----------+----------+
>                                 |                                    |        
>                             |           
>                                 
> +------------------------------------+------------------------------------+   
>         
>                                                                      |        
>                                         
>                              
> +---------------------------------------+--------------------------------------+
>          
>                              |                                                
>                               |         
>                              |                              HDFSSystemFactory 
>                               |         
>                              |                                                
>                               |         
>                              
> +------------------------------------------------------------------------------+
> 
> 
> Diffs
> -----
> 
>   build.gradle 1d4eb74b1294318db8454631ddd0901596121ab2 
>   gradle/dependency-versions.gradle 47c71bfde027835682889407261d4798b629d214 
>   samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemAdmin.java 
> PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/HdfsSystemConsumer.java 
> PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/PartitionDescriptionUtil.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/DirectoryPartitioner.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/FileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/partitioner/HdfsFileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/AvroFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/HdfsReaderFactory.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/MultiFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/main/java/org/apache/samza/system/hdfs/reader/SingleFileHdfsReader.java
>  PRE-CREATION 
>   samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsConfig.scala 
> 61b7570afae3219b618c8830905035063941bdd7 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemAdmin.scala 
> 92eb4472533db67dca01f075cb460581b4bdac0d 
>   
> samza-hdfs/src/main/scala/org/apache/samza/system/hdfs/HdfsSystemFactory.scala
>  ef3c20a097ddf2feecaf8b0ad4587ea4bf6570b7 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestHdfsSystemConsumer.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/TestPartitionDesctiptionUtil.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestDirectoryPartitioner.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/partitioner/TestHdfsFileSystemAdapter.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestAvroFileHdfsReader.java
>  PRE-CREATION 
>   
> samza-hdfs/src/test/java/org/apache/samza/system/hdfs/reader/TestMultiFileHdfsReader.java
>  PRE-CREATION 
>   samza-hdfs/src/test/resources/integTest/emptyTestFile PRE-CREATION 
>   samza-hdfs/src/test/resources/partitioner/testfile01 PRE-CREATION 
>   samza-hdfs/src/test/resources/partitioner/testfile02 PRE-CREATION 
>   samza-hdfs/src/test/resources/reader/TestEvent.avsc PRE-CREATION 
>   
> samza-hdfs/src/test/scala/org/apache/samza/system/hdfs/TestHdfsSystemProducerTestSuite.scala
>  261310d03de204718621f601117f016da14841df 
>   samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala 
> 4e328a5f8c2b496a71e36c106339b7af263c96c7 
> 
> Diff: https://reviews.apache.org/r/51142/diff/
> 
> 
> Testing
> -------
> 
> unit tests pass.
> 
> manually tested by writing a real hdfs samza job and deploying to a yarn 
> cluster.
> 
> 
> Thanks,
> 
> Hai Lu
> 
>

Reply via email to