Hi Amit,

could this be related [1]? How do you build your job?

[1] https://bugs.eclipse.org/bugs/show_bug.cgi?id=439889

Cheers,
Till

On Wed, Jan 3, 2018 at 2:55 PM, Timo Walther <twal...@apache.org> wrote:

> Hi Amit,
>
> which of the two lambdas caused the error? I guess it was the mapper after
> the parquet input, right? In both cases this should not happen. Maybe you
> can open an issue with a small reproducible code example?
>
> Thanks.
>
> Regards,
> Timo
>
>
> Am 1/3/18 um 12:15 PM schrieb Amit Jain:
>
> Hi Timo,
>>
>> Thanks a lot! Quick re-look over the code helped me to detect used
>> lambdas.
>> I was using lambdas in two cases which are following.
>>
>> DataSet<GenericRecord> newMainDataSet = mainDataSet
>>
>>      .fullOuterJoin(latestDeltaDataSet, JoinHint.REPARTITION_HASH_SECOND)
>>      .where(keySelector).equalTo(keySelector)
>>
>> *    .with((first, second) -> first != null && second != null ? second
>> : (first != null ? first : second))*    .filter(filterFunction)
>>      .returns(GenericRecord.class);
>>
>> DataSet<GenericRecord> mainDataSet =
>>
>>      mergeTableSecond.readParquet(mainPath, avroSchema, env)
>>          .withParameters(parameters)
>> *        .map(**t -> t.f1*
>> *)*        .returns(GenericRecord.class);
>>
>>
>>
>> On Wed, Jan 3, 2018 at 4:18 PM, Timo Walther <twal...@apache.org> wrote:
>>
>> Hi Amit,
>>>
>>> are you using lambdas as parameters of a Flink Function or in a member
>>> variable? If yes, can you share an lambda example that fails?
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 1/3/18 um 11:41 AM schrieb Amit Jain:
>>>
>>> Hi,
>>>>
>>>> I'm writing a job to merge old data with changelogs using DataSet API
>>>> where
>>>> I'm reading changelog using TextInputFormat and old data using
>>>> HadoopInputFormat.
>>>>
>>>> I can see, job manager has successfully deployed the program flow to
>>>> worker
>>>> nodes. However, workers are immediately going to failed state because of
>>>> *Caused by: java.lang.IllegalArgumentException: Invalid lambda
>>>> deserialization*
>>>>
>>>>
>>>> Complete stack trace
>>>> java.lang.RuntimeException: The initialization of the DataSource's
>>>> outputs
>>>> caused an error: Could not read the user code wrapper: unexpected
>>>> exception
>>>> type
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>>>> DataSourceTask.java:94)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>>>> at java.lang.Thread.run(Thread.java:748)
>>>> Caused by:
>>>> org.apache.flink.runtime.operators.util.CorruptConfigurationException:
>>>> Could not read the user code wrapper: unexpected exception type
>>>> at
>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
>>>> apper(TaskConfig.java:290)
>>>> at
>>>> org.apache.flink.runtime.operators.BatchTask.instantiateUser
>>>> Code(BatchTask.java:1432)
>>>> at
>>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver
>>>> .setup(ChainedMapDriver.java:39)
>>>> at
>>>> org.apache.flink.runtime.operators.chaining.ChainedDriver.
>>>> setup(ChainedDriver.java:90)
>>>> at
>>>> org.apache.flink.runtime.operators.BatchTask.initOutputs(
>>>> BatchTask.java:1299)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.initOutput
>>>> s(DataSourceTask.java:287)
>>>> at
>>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(
>>>> DataSourceTask.java:91)
>>>> ... 2 more
>>>> Caused by: java.io.IOException: unexpected exception type
>>>> at java.io.ObjectStreamClass.throwMiscException(ObjectStreamCla
>>>> ss.java:1682)
>>>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
>>>> s.java:1254)
>>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>> am.java:2078)
>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
>>>> m.java:2287)
>>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.
>>>> java:2211)
>>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
>>>> am.java:2069)
>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
>>>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.deserializeObject(In
>>>> stantiationUtil.java:290)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil.readObjectFromConfig
>>>> (InstantiationUtil.java:248)
>>>> at
>>>> org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
>>>> apper(TaskConfig.java:288)
>>>> ... 8 more
>>>> Caused by: java.lang.reflect.InvocationTargetException
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at java.lang.invoke.SerializedLambda.readResolve(SerializedLamb
>>>> da.java:230)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
>>>> s.java:1248)
>>>> ... 18 more
>>>> Caused by: java.lang.IllegalArgumentException: Invalid lambda
>>>> deserialization
>>>> at
>>>> org.myorg.quickstart.MergeTableSecond.$deserializeLambda$(Me
>>>> rgeTableSecond.java:41)
>>>> ... 28 more
>>>>
>>>>
>>>> Running Environment
>>>> Flink: 1.3.2
>>>> Java: openjdk version "1.8.0_151"
>>>>
>>>> Please help us resolve this issue.
>>>>
>>>>
>>>> --
>>>> Thanks,
>>>> Amit
>>>>
>>>>
>>>>
>

Reply via email to