[
https://issues.apache.org/jira/browse/PIG-4209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14256560#comment-14256560
]
liyunzhang_intel commented on PIG-4209:
---------------------------------------
In PIG-4209_1.patch, made following changes:
add two new class StreamConverter.java and POStreamSpark.java
StreamConverter contains StreamFunction which make POStream work in spark.
POStreamSpark extends POStream, only overwrite POStream#getNextTuple. Add a
parameter "proceed" in this function.
This parameter decides whether current record is the end of input record. If
"proceed" is "true", it is not the end otherwise "proceed" is "false"
{code}
public Result getNextTuple(boolean proceed) throws ExecException {
.....
}
{code}
POOutputConsumerIterator has some chanegs: not use recursive but use while in
POOutputConsumerIterator#readNext. It will not influence the execution while
adding a while loop because the condition to jump out of the while loop is same
as the condition to jump out of function. The other function for use while not
recursive is reducing the thread stack size.
previous code
{code}
private void readNext() {
try {
if (result != null && !returned) {
return;
}
// see PigGenericMapBase
if (result == null) {
if (!input.hasNext()) {
finished = true;
return;
}
Tuple v1 = input.next();
attach(v1);
}
result = getNextResult();
returned = false;
switch (result.returnStatus) {
case POStatus.STATUS_OK:
returned = false;
break;
case POStatus.STATUS_NULL:
returned = true; // skip: see PigGenericMapBase
readNext();
break;
case POStatus.STATUS_EOP:
finished = !input.hasNext();
if (!finished) {
result = null;
readNext();
}
break;
case POStatus.STATUS_ERR:
throw new RuntimeException("Error while processing " + result);
}
} catch (ExecException e) {
throw new RuntimeException(e);
}
}
{code}
code in PIG-4209_1.patch
{code}
private void readNext() {
while (true) {
try {
if (result != null && !returned) {
return;
}
// see PigGenericMapBase
if (result == null) {
if (!input.hasNext()) {
finished = true;
return;
}
Tuple v1 = input.next();
attach(v1);
}
result = getNextResult();
returned = false;
switch (result.returnStatus) {
case POStatus.STATUS_OK:
returned = false;
break;
case POStatus.STATUS_NULL:
returned = true; // skip: see PigGenericMapBase
break;
case POStatus.STATUS_EOP:
finished = !input.hasNext();
if (!finished) {
result = null;
}
break;
case POStatus.STATUS_ERR:
throw new RuntimeException("Error while processing " +
result);
}
} catch (ExecException e) {
throw new RuntimeException(e);
}
}
}
{code}
> Make stream work with Spark
> ---------------------------
>
> Key: PIG-4209
> URL: https://issues.apache.org/jira/browse/PIG-4209
> Project: Pig
> Issue Type: Sub-task
> Components: spark
> Reporter: Praveen Rachabattuni
> Assignee: liyunzhang_intel
> Attachments: PIG-4209.patch, test_harnesss_1417075517
>
>
> Related e2e-tests: StreamingLocal_[1-18]
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)