[ 
https://issues.apache.org/jira/browse/HUDI-1621?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xie Lei updated HUDI-1621:
--------------------------
    Description: 
If the parallelism in the constructor of StreamWriteOperatorFactory is not 
equal to the parallelism of the Operator, the flink task will run failed.

For example, if the StreamWriteOperatorFactory's parallelism less than 
Operator's parallelism
{code:java}
6807 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Trying to recover from a global 
failure.
java.lang.ArrayIndexOutOfBoundsException: 3
  at 
org.apache.hudi.operator.StreamWriteOperatorCoordinator.handleEventFromOperator(StreamWriteOperatorCoordinator.java:181)
  at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:191)
  at 
org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:952)
  at 
org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:473)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
  at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
  at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
{code}
 

  was:
If the parallelism in the constructor of StreamWriteOperatorFactory is not 
equal to the parallelism of the Operator, the flink task will run failed.

For example, if the StreamWriteOperatorFactory's parallelism less than 
Operator's parallelism
{code:java}
6807 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Trying to recover from a global 
failure.6807 [flink-akka.actor.default-dispatcher-4] INFO  
org.apache.flink.runtime.jobmaster.JobMaster  - Trying to recover from a global 
failure.java.lang.ArrayIndexOutOfBoundsException: 3 at 
org.apache.hudi.operator.StreamWriteOperatorCoordinator.handleEventFromOperator(StreamWriteOperatorCoordinator.java:181)
 at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:191)
 at 
org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:952)
 at 
org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:473)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498) at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
{code}
 


> Gets the parallelism from context when init StreamWriteOperatorCoordinator
> --------------------------------------------------------------------------
>
>                 Key: HUDI-1621
>                 URL: https://issues.apache.org/jira/browse/HUDI-1621
>             Project: Apache Hudi
>          Issue Type: Improvement
>          Components: Flink Integration
>            Reporter: Xie Lei
>            Priority: Major
>
> If the parallelism in the constructor of StreamWriteOperatorFactory is not 
> equal to the parallelism of the Operator, the flink task will run failed.
> For example, if the StreamWriteOperatorFactory's parallelism less than 
> Operator's parallelism
> {code:java}
> 6807 [flink-akka.actor.default-dispatcher-4] INFO  
> org.apache.flink.runtime.jobmaster.JobMaster  - Trying to recover from a 
> global failure.
> java.lang.ArrayIndexOutOfBoundsException: 3
>   at 
> org.apache.hudi.operator.StreamWriteOperatorCoordinator.handleEventFromOperator(StreamWriteOperatorCoordinator.java:181)
>   at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.handleEventFromOperator(OperatorCoordinatorHolder.java:191)
>   at 
> org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:952)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:473)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to