Hi Ravi, The uber jar was correct, adding ClosureCleanerLevel to TOP_LEVEL resolved this issue. Thanks a lot.
Is there any disadvantage of explicitly setting this ? Regards, Vinay Patil On Sat, Jul 20, 2019 at 10:23 PM Ravi Bhushan Ratnakar < ravibhushanratna...@gmail.com> wrote: > Hi Vinay, > > ObjectNode seems ok as this is being used by flink provided > "JsonNodeDeserailizationSchema". > > Please verify that you are using maven dependency > "flink-connector-kinesis" 1.8.1 version (with your Flink 1.8.1 cluster) and > package this dependency as part of your application uber/fat jar. If you > are already doing this way then, please also try to set closure cleaner > level to "TOP_LEVEL" like below. > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.getConfig.setClosureCleanerLevel(ExecutionConfig.ClosureCleanerLevel.TOP_LEVEL) > > > Regards, > Ravi > > On Sat, Jul 20, 2019 at 1:53 PM Vinay Patil <vinay18.pa...@gmail.com> > wrote: > >> Hi Ravi, >> >> Tried with both new and legacy mode, it works locally but on cluster I am >> getting this exception, I am passing jackson ObjectNode class, should be >> serializable. What do you think? >> >> On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, < >> ravibhushanratna...@gmail.com> wrote: >> >>> Hi Vinay, >>> >>> Please make sure that all your custom code is serializable. You can run >>> this using new mode. >>> >>> Thanks, >>> Ravi >>> >>> On Sat 20 Jul, 2019, 08:13 Vinay Patil, <vinay18.pa...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> I am trying to run a pipeline on Flink 1.8.1 ,getting the following >>>> exception: >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *java.lang.StackOverflowError at >>>> java.lang.Exception.<init>(Exception.java:66) at >>>> java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56) >>>> at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at >>>> java.lang.Class.getDeclaredMethod(Class.java:2130) at >>>> org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153) >>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78) >>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115) >>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)* >>>> >>>> I have even tried running in legacy mode, the pipeline code is : >>>> >>>> private void execute(String[] args) { >>>> ParameterTool pt = ParameterTool.fromArgs(args); >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> //env.setMaxParallelism(30); >>>> env.setParallelism(20); >>>> >>>> env.enableCheckpointing(5000); >>>> StateBackend backend = new >>>> FsStateBackend(pt.getRequired("checkpoint_path"), true); >>>> env.setStateBackend(backend); >>>> >>>> FlinkDynamoDBStreamsConsumer<ObjectNode> >>>> flinkDynamoDBStreamsConsumer = >>>> new >>>> FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME, new >>>> JsonNodeDeserializationSchema(), >>>> dynamodbStreamsConsumerConfig); >>>> >>>> SingleOutputStreamOperator<ObjectNode> sourceStream = env >>>> .addSource(flinkDynamoDBStreamsConsumer) >>>> .name("Dynamo DB Streams"); >>>> >>>> sourceStream >>>> .keyBy(new CdcKeySelector()) >>>> .addSink(new >>>> FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams", >>>> new JsonSerializationSchema())) >>>> .name("Kafka Sink"); >>>> >>>> try { >>>> env.execute(); >>>> } catch (Exception e) { >>>> System.out.println("Caught exception for pipeline" + >>>> e.getMessage()); >>>> e.printStackTrace(); >>>> } >>>> } >>>> >>>> Regards, >>>> Vinay Patil >>>> >>>