Hi Ram,
I tried in the same way, but the problem that I face is that the return type of
definePartition() method does not match with super class. Hence I temporarily
changed AbstractFileInputOperator, but looking for a better way to do this.
#############################definePartition() method
#######################################
@Override
public Collection<Partition<AbstractFileInputOperator<String>>>
definePartitions(
Collection<Partition<AbstractFileInputOperator<String>>>
partitions, PartitioningContext context) {
final int prevCount = partitions.size();
if (1 < prevCount) {
throw new RuntimeException("Error: Dynamic repartition not
supported");
}
//compute first and last indices of partitions for each directory
//final int numDirs = directories.length, numParCounts =
partitionCounts.length;
final int numDirs = inputDirectories.size(), numParCounts =
partCounts.size();
//final int[] sliceFirstIndex = new int[numDirs];
Map<String,Integer> sliceFirstIndex = new
HashMap<String,Integer>(numDirs);
LOG.info("definePartitions: prevCount = {}, directories.size =
{}, " + "partitionCounts.size = {}",
prevCount, numDirs, numParCounts);
int nPartitions = 0; // desired number of partitions
for(String sourceId : inputDirectories.keySet()){
sliceFirstIndex.put(sourceId, nPartitions);
final int nP = Integer.parseInt(partCounts.get(sourceId));
LOG.info("definePartitions:sourceId = {} ,no of partitions
= {}, dir = {}", sourceId, nP, inputDirectories.get(sourceId));
nPartitions += nP;
}
/*if (1 == nPartitions) {
LOG.info("definePartitions: Nothing to do in
definePartitions");
return partitions; // nothing to do
}*/
if (nPartitions <= 0) { // error
final String msg = String.format("Error: Bad number of
partitions %d%n", nPartitions);
LOG.error(msg);
throw new RuntimeException(msg);
}
this.partitionCount = nPartitions;
LOG.debug("definePartitions: Creating {} partitions",
nPartitions);
/*
* Create partitions of scanners, scanner's partition method will
do
* state transfer for DirectoryScanner objects.
*/
Kryo kryo = new Kryo();
SlicedDirectoryScanner sds = (SlicedDirectoryScanner) scanner;
List<SlicedDirectoryScanner> scanners =
sds.partition(nPartitions, inputDirectories, partCounts);
// return value: new list of partitions (includes old list)
List<Partition<AbstractFileReader>> newPartitions = new
ArrayList(nPartitions);
// parallel list of storage managers
Collection<IdempotentStorageManager> newManagers = new
ArrayList(nPartitions);
// setup new partitions
LOG.info("definePartitions: setting up {} new partitoins with {}
monitored directories", nPartitions, numDirs);
final IdempotentStorageManager ism =
getIdempotentStorageManager();
for (String sourceId : inputDirectories.keySet()) {
int first = sliceFirstIndex.get(sourceId);
int last = first +
Integer.parseInt(partCounts.get(sourceId));
String dir = Helper.changeInputDirectory(sourceId,
inputDirectories.get(sourceId),snapDates.get(sourceId));
//String dir = inputDirectories.get(sourceId);
String inConfig = inputConfigFiles.get(sourceId);
String outConfig = outputConfigFiles.get(sourceId);
String loadDate = snapDates.get(sourceId);
LOG.info("definePartitions: first = {}, last = {}, dir =
{}", first, last, dir);
LOG.info("definePartitions: directory = {},
inputConfigFile = {}, outputConfigFile = {} , loadDate = {}", dir,
inConfig,outConfig,loadDate);
for (int i = first; i < last; ++i) {
AbstractFileReader oper =
(AbstractFileReader)cloneObject(kryo, this);
oper.setDirectory(dir);
oper.setInputConfFile(inConfig);
oper.setOutputConfFile(outConfig);
oper.setSourceId(sourceId);
oper.setLoadDate(loadDate);
oper.setOldInputDir(inputDirectories.get(sourceId));
SlicedDirectoryScanner scn =
(SlicedDirectoryScanner) scanners.get(i);
scn.setStartIndex(first);
scn.setEndIndex(last);
scn.setDirectory(dir);
oper.setScanner(scn);
newPartitions.add(new DefaultPartition<>(oper));
newManagers.add(oper.getIdempotentStorageManager());
}
}
ism.partitioned(newManagers, null);
LOG.info("definePartition: returning {} partitions",
newPartitions.size());
return newPartitions;
}
Regards,
Surya Vamshi
From: Munagala Ramanath [mailto:[email protected]]
Sent: 2016, June, 28 2:35 PM
To: [email protected]
Subject: Re: Reading Multiple directories in parallel
You can add those properties in your super class and simply cast the clone to
that class, so change:
AbstractFileInputOperator<String> oper = cloneObject(kryo, this);
to something like:
MyFileInputOperator<String> oper = (MyFileInputOperator) cloneObject(kryo,
this);
On Tue, Jun 28, 2016 at 11:17 AM, Mukkamula, Suryavamshivardhan (CWM-NR)
<[email protected]<mailto:[email protected]>>
wrote:
Hi Ram,
I would like to add the parameters for each partition like below. Each operator
to be given with its own configuration file and source identifier. If there is
any other way please let me know ?
In my current definepartition() method , I am doing similarly like below, but I
have to add setter and getter methods in AbstractFileInputOperator class.
for (int j = 0; j < numDirs; ++j) {
int first = sliceFirstIndex[j];
int last = first + partitionCounts[j];
String dir = directories[j];
LOG.info("definePartitions: first = {}, last = {}, dir = {}", first,
last, dir);
for (int i = first; i < last; ++i) {
AbstractFileInputOperator<String> oper = cloneObject(kryo, this);
oper.setDirectory(dir);
oper.setSourceId(<sourceId>);
oper.setConfigFile(<fileName>);
//oper.setpIndex(i);
SlicedDirectoryScanner scn = (SlicedDirectoryScanner) scanners.get(i);
scn.setStartIndex(first);
scn.setEndIndex(last);
scn.setDirectory(dir);
oper.setScanner(scn);
newPartitions.add(new DefaultPartition<>(oper));
newManagers.add(oper.getIdempotentStorageManager());
}
}
Regards,
Surya Vamshi
From: Munagala Ramanath
[mailto:[email protected]<mailto:[email protected]>]
Sent: 2016, June, 28 2:03 PM
To: [email protected]<mailto:[email protected]>
Subject: Re: Reading Multiple directories in parallel
Not sure I fully understand the question but you can add whatever fields you
need
to your class that extends AbstractFileInputOperator. For example,
https://github.com/DataTorrent/examples/blob/master/tutorials/fileIO-multiDir/src/main/java/com/example/fileIO/FileReaderMultiDir.java
defines fields directories and partitionCounts.
You can then set these fields as needed in definePartitions.
Ram
On Tue, Jun 28, 2016 at 10:31 AM, Mukkamula, Suryavamshivardhan (CWM-NR)
<[email protected]<mailto:[email protected]>>
wrote:
Hi Ram,
Can you please suggest , how would I add another variable (like ‘directory’)
while creating multiple partitions of AbstractFileInputOperator in the define
partition method.
I have currently added variables in the AbstractFileInputOperator , which I
guess not a better way.
These variables are basically used to scan directories in parallel differently.
Regards,
Surya Vamshi
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email
or otherwise) immediately. You have consented to receive the attached
electronically at the above-noted email address; please retain a copy of this
confirmation for future reference.
Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation
pour les fins de reference future.
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email
or otherwise) immediately. You have consented to receive the attached
electronically at the above-noted email address; please retain a copy of this
confirmation for future reference.
Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation
pour les fins de reference future.
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email
or otherwise) immediately. You have consented to receive the attached
electronically at the above-noted email address; please retain a copy of this
confirmation for future reference.
Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation
pour les fins de reference future.