Hi Amit,

which of the two lambdas caused the error? I guess it was the mapper after the parquet input, right? In both cases this should not happen. Maybe you can open an issue with a small reproducible code example?

Thanks.

Regards,
Timo


Am 1/3/18 um 12:15 PM schrieb Amit Jain:
Hi Timo,

Thanks a lot! Quick re-look over the code helped me to detect used lambdas.
I was using lambdas in two cases which are following.

DataSet<GenericRecord> newMainDataSet = mainDataSet

     .fullOuterJoin(latestDeltaDataSet, JoinHint.REPARTITION_HASH_SECOND)
     .where(keySelector).equalTo(keySelector)

*    .with((first, second) -> first != null && second != null ? second
: (first != null ? first : second))*    .filter(filterFunction)
     .returns(GenericRecord.class);

DataSet<GenericRecord> mainDataSet =

     mergeTableSecond.readParquet(mainPath, avroSchema, env)
         .withParameters(parameters)
*        .map(**t -> t.f1*
*)*        .returns(GenericRecord.class);



On Wed, Jan 3, 2018 at 4:18 PM, Timo Walther <twal...@apache.org> wrote:

Hi Amit,

are you using lambdas as parameters of a Flink Function or in a member
variable? If yes, can you share an lambda example that fails?

Regards,
Timo


Am 1/3/18 um 11:41 AM schrieb Amit Jain:

Hi,

I'm writing a job to merge old data with changelogs using DataSet API
where
I'm reading changelog using TextInputFormat and old data using
HadoopInputFormat.

I can see, job manager has successfully deployed the program flow to
worker
nodes. However, workers are immediately going to failed state because of
*Caused by: java.lang.IllegalArgumentException: Invalid lambda
deserialization*


Complete stack trace
java.lang.RuntimeException: The initialization of the DataSource's outputs
caused an error: Could not read the user code wrapper: unexpected
exception
type
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(
DataSourceTask.java:94)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by:
org.apache.flink.runtime.operators.util.CorruptConfigurationException:
Could not read the user code wrapper: unexpected exception type
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
apper(TaskConfig.java:290)
at
org.apache.flink.runtime.operators.BatchTask.instantiateUser
Code(BatchTask.java:1432)
at
org.apache.flink.runtime.operators.chaining.ChainedMapDriver
.setup(ChainedMapDriver.java:39)
at
org.apache.flink.runtime.operators.chaining.ChainedDriver.
setup(ChainedDriver.java:90)
at
org.apache.flink.runtime.operators.BatchTask.initOutputs(
BatchTask.java:1299)
at
org.apache.flink.runtime.operators.DataSourceTask.initOutput
s(DataSourceTask.java:287)
at
org.apache.flink.runtime.operators.DataSourceTask.invoke(
DataSourceTask.java:91)
... 2 more
Caused by: java.io.IOException: unexpected exception type
at java.io.ObjectStreamClass.throwMiscException(ObjectStreamCla
ss.java:1682)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
s.java:1254)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
am.java:2078)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStrea
m.java:2287)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStre
am.java:2069)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:433)
at
org.apache.flink.util.InstantiationUtil.deserializeObject(In
stantiationUtil.java:290)
at
org.apache.flink.util.InstantiationUtil.readObjectFromConfig
(InstantiationUtil.java:248)
at
org.apache.flink.runtime.operators.util.TaskConfig.getStubWr
apper(TaskConfig.java:288)
... 8 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
ssorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
thodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.lang.invoke.SerializedLambda.readResolve(SerializedLamb
da.java:230)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
ssorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
thodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClas
s.java:1248)
... 18 more
Caused by: java.lang.IllegalArgumentException: Invalid lambda
deserialization
at
org.myorg.quickstart.MergeTableSecond.$deserializeLambda$(Me
rgeTableSecond.java:41)
... 28 more


Running Environment
Flink: 1.3.2
Java: openjdk version "1.8.0_151"

Please help us resolve this issue.


--
Thanks,
Amit



Reply via email to