[ 
https://issues.apache.org/jira/browse/PIG-4209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14256560#comment-14256560
 ] 

liyunzhang_intel commented on PIG-4209:
---------------------------------------

In PIG-4209_1.patch, made following changes:
add two new class StreamConverter.java and POStreamSpark.java
StreamConverter contains StreamFunction which make POStream work in spark.
POStreamSpark extends POStream, only overwrite POStream#getNextTuple.  Add a 
parameter "proceed" in this function.
This parameter decides whether current record is the end of input record. If 
"proceed" is "true", it is not the end otherwise "proceed" is "false"
{code}
  public Result getNextTuple(boolean proceed) throws ExecException {
    .....
  }
{code}

POOutputConsumerIterator has some chanegs:  not use recursive but use while in 
POOutputConsumerIterator#readNext. It will not influence the execution while 
adding a while loop because the condition to jump out of the while loop is same 
as the condition to jump out of function. The other function for use while not 
recursive is reducing the thread stack size.
previous code
{code}
private void readNext() {
        try {
            if (result != null && !returned) {
                return;
            }
            // see PigGenericMapBase
            if (result == null) {
                if (!input.hasNext()) {
                    finished = true;
                    return;
                }
                Tuple v1 = input.next();
                attach(v1);
            }
            result = getNextResult();
            returned = false;
            switch (result.returnStatus) {
            case POStatus.STATUS_OK:
                returned = false;
                break;
            case POStatus.STATUS_NULL:
                returned = true; // skip: see PigGenericMapBase
                readNext();
                break;
            case POStatus.STATUS_EOP:
                finished = !input.hasNext();
                if (!finished) {
                    result = null;
                    readNext();
                }
                break;
            case POStatus.STATUS_ERR:
                throw new RuntimeException("Error while processing " + result);
            }
        } catch (ExecException e) {
            throw new RuntimeException(e);
        }
    }
{code}
code in PIG-4209_1.patch
{code}
         private void readNext() {
        while (true) {
            try {
                if (result != null && !returned) {
                    return;
                }
                // see PigGenericMapBase
                if (result == null) {
                    if (!input.hasNext()) {
                        finished = true;

                        return;
                    }
                    Tuple v1 = input.next();
                    attach(v1);
                }
                result = getNextResult();
                returned = false;
                switch (result.returnStatus) {
                    case POStatus.STATUS_OK:
                        returned = false;
                        break;
                    case POStatus.STATUS_NULL:
                        returned = true; // skip: see PigGenericMapBase
                        break;
                    case POStatus.STATUS_EOP:
                        finished = !input.hasNext();
                        if (!finished) {
                            result = null;
                        }
                        break;
                    case POStatus.STATUS_ERR:
                        throw new RuntimeException("Error while processing " + 
result);
                }
            } catch (ExecException e) {
                throw new RuntimeException(e);
            }
        }
    }
{code}

> Make stream work with Spark
> ---------------------------
>
>                 Key: PIG-4209
>                 URL: https://issues.apache.org/jira/browse/PIG-4209
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Praveen Rachabattuni
>            Assignee: liyunzhang_intel
>         Attachments: PIG-4209.patch, test_harnesss_1417075517
>
>
> Related e2e-tests: StreamingLocal_[1-18]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to