I've double checked and I think that CollectSinkOperatorFactory is
initialized in DataStream.collectAsync without MAX_BATCH_SIZE
and SOCKET_TIMEOUT values coming from the Flink config.
Could you plz share the whole stacktrace to double check my assumption?

G


On Tue, Jul 23, 2024 at 12:46 PM Salva Alcántara <salcantara...@gmail.com>
wrote:

> Hi all,
>
> Just to share my findings so far. Regarding tweaking the setting, it has
> been impossible for me to do so. So, the only way to work around this has
> been to duplicate some Flink code directly to allow me to do the tweak.
> More precisely, this is how my code looks like now (kudos to my dear
> colleague Den!):
>
> ```
>   private static <T> List<T> executeAndCollect(DataStream<T> dataStream,
> StreamExecutionEnvironment env,
>                                                String maxBatchSize, int
> limit) throws Exception {
>
>     TypeSerializer<T> serializer =
> dataStream.getType().createSerializer(env.getConfig());
>     String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
>
>     CollectSinkOperatorFactory<T> factory =
>         new CollectSinkOperatorFactory<>(serializer, accumulatorName,
> MemorySize.parse(maxBatchSize), SOCKET_TIMEOUT.defaultValue());
>     CollectSinkOperator<MatcherState> operator =
> (CollectSinkOperator<MatcherState>) factory.getOperator();
>     CollectResultIterator<T> iterator =
>         new CollectResultIterator<>(
>             operator.getOperatorIdFuture(),
>             serializer,
>             accumulatorName,
>             env.getCheckpointConfig());
>     CollectStreamSink<T> sink = new CollectStreamSink<>(dataStream,
> factory);
>     sink.name("Data stream collect sink");
>     env.addOperator(sink.getTransformation());
>
>     final JobClient jobClient = env.executeAsync("DataStream Collect");
>     iterator.setJobClient(jobClient);
>
>     var clientAndIterator = new ClientAndIterator<>(jobClient, iterator);
>     List<T> results = new ArrayList<>(limit);
>     while (limit > 0 && clientAndIterator.iterator.hasNext()) {
>       results.add(clientAndIterator.iterator.next());
>       limit--;
>     }
>     return results;
>   }
> ```
>
> Essentially, I'm just adding a parameter to the CollectSinkOperatorFactory
> constructor here:
> -
> https://github.com/apache/flink/blob/release-1.15.4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1378
>
> This works but it's obviously inconvenient for the user. If this
> limitation is confirmed, Den and I will be glad to send a MR to fix that.
>
> Makes sense?
>
> Regards,
>
> Salva
>
> On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara <salcantara...@gmail.com>
> wrote:
>
>> The same happens with this slight variation:
>>
>> ```
>> Configuration config = new Configuration();
>> config.setString("collect-sink.batch-size.max", "100mb");
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.configure(config);
>> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new
>> HashMapStateBackend());
>> ```
>>
>> Salva
>>
>> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara <salcantara...@gmail.com>
>> wrote:
>>
>>> Hi Zhanghao,
>>>
>>> Thanks for your suggestion. Unfortunately, this does not work, I still
>>> get the same error message:
>>>
>>> ```
>>> Record size is too large for CollectSinkFunction. Record size is 9623137
>>> bytes, but max bytes per batch is only 2097152 bytes.
>>> Please consider increasing max bytes per batch value by setting
>>> collect-sink.batch-size.max
>>> ```
>>>
>>> The code looks like this now:
>>>
>>> ```
>>> Configuration config = new Configuration();
>>> config.setString("collect-sink.batch-size.max", "10mb");
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new
>>> HashMapStateBackend());
>>>
>>> var matcher = savepoint.readKeyedState("Raw Triggers", new
>>> MatcherReader());
>>> var matcherState = matcher.executeAndCollect(1000);
>>> ```
>>>
>>> I have tried other ways but none has worked (the setting is always
>>> ignored in the end).
>>>
>>> Regards,
>>>
>>> Salva
>>>
>>>
>>>
>>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen <zhanghao.c...@outlook.com>
>>> wrote:
>>>
>>>> Hi, you could increase it as follows:
>>>>
>>>> Configuration config = new Configuration();
>>>> config.setString(collect-sink.batch-size.max, "10mb");
>>>> StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment(config);
>>>> ------------------------------
>>>> *From:* Salva Alcántara <salcantara...@gmail.com>
>>>> *Sent:* Saturday, July 20, 2024 15:05
>>>> *To:* user <user@flink.apache.org>
>>>> *Subject:* SavepointReader: Record size is too large for
>>>> CollectSinkFunction
>>>>
>>>> Hi all!
>>>>
>>>> I'm trying to debug a job via inspecting its savepoints but I'm getting
>>>> this error message:
>>>>
>>>> ```
>>>> Caused by: java.lang.RuntimeException: Record size is too large for
>>>> CollectSinkFunction. Record size is 9627127 bytes, but max bytes per batch
>>>> is only 2097152 bytes. Please consider increasing max bytes per batch value
>>>> by setting collect-sink.batch-size.max
>>>> ```
>>>>
>>>> My code looks like this:
>>>>
>>>> ```
>>>>   private static void run(String savepointPath) throws Exception {
>>>>     StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>     org.apache.flink.state.api.SavepointReader savepoint =
>>>>         org.apache.flink.state.api.SavepointReader.read(env,
>>>> savepointPath, new HashMapStateBackend());
>>>>
>>>>     var operator = savepoint.readKeyedState("uuid", new
>>>> MyKeyedOperatorReader());
>>>>     var operatorState = matcher.executeAndCollect(1000);
>>>>   }
>>>> ```
>>>>
>>>> I haven't found the way to increase the `collect-sink.batch-size.max`
>>>> as suggested in the error msg.
>>>>
>>>> Any help on this will be appreciated!
>>>>
>>>> Regards,
>>>>
>>>> Salva
>>>>
>>>

Reply via email to