[ https://issues.apache.org/jira/browse/PIG-5205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
liyunzhang_intel resolved PIG-5205. ----------------------------------- Resolution: Fixed > Duplicate record key info in GlobalRearrangeConverter#ToGroupKeyValueFunction > ----------------------------------------------------------------------------- > > Key: PIG-5205 > URL: https://issues.apache.org/jira/browse/PIG-5205 > Project: Pig > Issue Type: Sub-task > Components: spark > Reporter: liyunzhang_intel > Assignee: liyunzhang_intel > Fix For: spark-branch > > Attachments: PIG-5205.patch > > > in > org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter.ToGroupKeyValueFunction > {code} > @Override > public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) { > try { > .... > List<Iterator<Tuple>> tupleIterators = new > ArrayList<Iterator<Tuple>>(); > for (int j = 0; j < bags.length; j ++) { > Seq<Tuple> bag = bags[j]; > Iterator<Tuple> iterator = JavaConversions > .asJavaCollection(bag).iterator(); > final int index = i; > tupleIterators.add(new IteratorTransform<Tuple, Tuple>( > iterator) { > @Override > protected Tuple transform(Tuple next) { > try { > Tuple tuple = tf.newTuple(3); > tuple.set(0, index); > # we record duplicate key info here > #for every records, we will use out.set(0, > key) later. may be the key info can be removed > tuple.set(1, key); > tuple.set(2, next); > return tuple; > } catch (ExecException e) { > throw new RuntimeException(e); > } > } > }); > ++ i; > } > Tuple out = tf.newTuple(2); > out.set(0, key); > out.set(1, new > IteratorUnion<Tuple>(tupleIterators.iterator())); > if (LOG.isDebugEnabled()) { > LOG.debug("ToGroupKeyValueFunction out " + out); > } > return out; > } catch (Exception e) { > throw new RuntimeException(e); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)