Round robin is not idempotent, so you can't have exactly once.
On Sat, Oct 15, 2016 at 4:49 PM Munagala Ramanath <[email protected]>
wrote:
> If you want round-robin distribution which will give you uniform load
> across all partitions you can use
> a StreamCodec like this (provided the number of partitions is known and
> static):
>
> *public class CatagoryStreamCodec extends
> KryoSerializableStreamCodec<Object> {*
> * private int n = 0;*
> * @Override*
> * public int getPartition(Object in) {*
> * return n++ % nPartitions; // nPartitions is the number of
> partitions*
> * }*
> *}*
>
> If you want certain category names to go to certain partitions, you can
> create that mapping
> within the StreamCodec (map category names to integers in the range
> *0..nPartitions-1*), and, for each tuple, lookup the category name in the
> map and return the corresponding value.
>
> Ram
> ....
>
> On Fri, Oct 14, 2016 at 1:17 PM, Sunil Parmar <[email protected]>
> wrote:
>
> We’re using Stream codec to consistently / parallel processing of the data
> across the operator partitions. Our requirement is to serialize processing
> of the data based on particular tuple attribute let’s call it
> ‘catagory_name’ . In order to achieve the parallel processing of different
> category names we’re written our stream codec as following.
>
> public class CatagoryStreamCodec extends
> KryoSerializableStreamCodec<Object> {
>
> private static final long serialVersionUID = -687991492884005033L;
>
>
>
> @Override
>
> public int getPartition(Object in) {
>
> try {
>
> InputTuple tuple = (InputTuple) in;
>
> String partitionKehy = tuple.getName();
>
> if(partitionKehy != null) {
>
> return partitionKehy.hashCode();
>
> }
>
> }
> }
>
> It’s working as expected *but *we observed inconsistent partitions when
> we run this in production env with 20 partitioner of the operator following
> the codec in the dag.
>
> - Some operator instance didn’t process any data
> - Some operator instance process as many tuples as combined everybody
> else
>
>
> Questions :
>
> - getPartition method supposed to return the actual partition or just
> some lower bit used for deciding partition ?
> - Number of partitions is known to application properties and can vary
> between deployments or environments. Is it best practice to use that
> property in the stream codec ?
> - Any recommended hash function for getting consistent variations in
> the lower bit with less variety of data. we’ve ~100+ categories and I’m
> thinking to have 10+ operator partitions.
>
>
> Thanks,
> Sunil
>
>