Hi Ravi,

The uber jar was correct, adding ClosureCleanerLevel to TOP_LEVEL resolved
this issue. Thanks a lot.

Is there any disadvantage of explicitly setting this ?


Regards,
Vinay Patil


On Sat, Jul 20, 2019 at 10:23 PM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> Hi Vinay,
>
> ObjectNode seems ok as this is being used by flink provided
> "JsonNodeDeserailizationSchema".
>
> Please verify that you are using maven dependency
> "flink-connector-kinesis" 1.8.1 version (with your Flink 1.8.1 cluster) and
> package this dependency as part of your application uber/fat jar. If you
> are already doing this way then, please also try to set closure cleaner
> level to "TOP_LEVEL" like below.
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.getConfig.setClosureCleanerLevel(ExecutionConfig.ClosureCleanerLevel.TOP_LEVEL)
>
>
> Regards,
> Ravi
>
> On Sat, Jul 20, 2019 at 1:53 PM Vinay Patil <vinay18.pa...@gmail.com>
> wrote:
>
>> Hi Ravi,
>>
>> Tried with both new and legacy mode, it works locally but on cluster I am
>> getting this exception, I am passing jackson ObjectNode class, should be
>> serializable. What do you think?
>>
>> On Sat, 20 Jul 2019, 12:11 Ravi Bhushan Ratnakar, <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> Hi Vinay,
>>>
>>> Please make sure that all your custom code is serializable. You can run
>>> this using new mode.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On Sat 20 Jul, 2019, 08:13 Vinay Patil, <vinay18.pa...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am trying to run a pipeline on Flink 1.8.1 ,getting the following
>>>> exception:
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *java.lang.StackOverflowError at
>>>> java.lang.Exception.<init>(Exception.java:66) at
>>>> java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:56)
>>>> at java.lang.NoSuchMethodException.<init>(NoSuchMethodException.java:51) at
>>>> java.lang.Class.getDeclaredMethod(Class.java:2130) at
>>>> org.apache.flink.api.java.ClosureCleaner.usesCustomSerialization(ClosureCleaner.java:153)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:78)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)
>>>> at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:115)*
>>>>
>>>> I have even tried running in legacy mode, the pipeline code is :
>>>>
>>>> private void execute(String[] args) {
>>>>         ParameterTool pt = ParameterTool.fromArgs(args);
>>>>
>>>>         StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>         //env.setMaxParallelism(30);
>>>>         env.setParallelism(20);
>>>>
>>>>         env.enableCheckpointing(5000);
>>>>         StateBackend backend = new
>>>> FsStateBackend(pt.getRequired("checkpoint_path"), true);
>>>>         env.setStateBackend(backend);
>>>>
>>>>         FlinkDynamoDBStreamsConsumer<ObjectNode>
>>>> flinkDynamoDBStreamsConsumer =
>>>>                 new
>>>> FlinkDynamoDBStreamsConsumer<>(DYNAMODB_STREAM_NAME, new
>>>> JsonNodeDeserializationSchema(),
>>>>                         dynamodbStreamsConsumerConfig);
>>>>
>>>>         SingleOutputStreamOperator<ObjectNode> sourceStream = env
>>>>                 .addSource(flinkDynamoDBStreamsConsumer)
>>>>                 .name("Dynamo DB Streams");
>>>>
>>>>         sourceStream
>>>>                 .keyBy(new CdcKeySelector())
>>>>                 .addSink(new
>>>> FlinkKafkaProducer<>("dev-broker.hotstar.npe:9092", "ums-dynamo-streams",
>>>>                         new JsonSerializationSchema()))
>>>>                 .name("Kafka Sink");
>>>>
>>>>         try {
>>>>             env.execute();
>>>>         } catch (Exception e) {
>>>>             System.out.println("Caught exception for pipeline" +
>>>> e.getMessage());
>>>>             e.printStackTrace();
>>>>         }
>>>>     }
>>>>
>>>> Regards,
>>>> Vinay Patil
>>>>
>>>

Reply via email to