I've double checked and I think that CollectSinkOperatorFactory is initialized in DataStream.collectAsync without MAX_BATCH_SIZE and SOCKET_TIMEOUT values coming from the Flink config. Could you plz share the whole stacktrace to double check my assumption?
G On Tue, Jul 23, 2024 at 12:46 PM Salva Alcántara <salcantara...@gmail.com> wrote: > Hi all, > > Just to share my findings so far. Regarding tweaking the setting, it has > been impossible for me to do so. So, the only way to work around this has > been to duplicate some Flink code directly to allow me to do the tweak. > More precisely, this is how my code looks like now (kudos to my dear > colleague Den!): > > ``` > private static <T> List<T> executeAndCollect(DataStream<T> dataStream, > StreamExecutionEnvironment env, > String maxBatchSize, int > limit) throws Exception { > > TypeSerializer<T> serializer = > dataStream.getType().createSerializer(env.getConfig()); > String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); > > CollectSinkOperatorFactory<T> factory = > new CollectSinkOperatorFactory<>(serializer, accumulatorName, > MemorySize.parse(maxBatchSize), SOCKET_TIMEOUT.defaultValue()); > CollectSinkOperator<MatcherState> operator = > (CollectSinkOperator<MatcherState>) factory.getOperator(); > CollectResultIterator<T> iterator = > new CollectResultIterator<>( > operator.getOperatorIdFuture(), > serializer, > accumulatorName, > env.getCheckpointConfig()); > CollectStreamSink<T> sink = new CollectStreamSink<>(dataStream, > factory); > sink.name("Data stream collect sink"); > env.addOperator(sink.getTransformation()); > > final JobClient jobClient = env.executeAsync("DataStream Collect"); > iterator.setJobClient(jobClient); > > var clientAndIterator = new ClientAndIterator<>(jobClient, iterator); > List<T> results = new ArrayList<>(limit); > while (limit > 0 && clientAndIterator.iterator.hasNext()) { > results.add(clientAndIterator.iterator.next()); > limit--; > } > return results; > } > ``` > > Essentially, I'm just adding a parameter to the CollectSinkOperatorFactory > constructor here: > - > https://github.com/apache/flink/blob/release-1.15.4/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java#L1378 > > This works but it's obviously inconvenient for the user. If this > limitation is confirmed, Den and I will be glad to send a MR to fix that. > > Makes sense? > > Regards, > > Salva > > On Mon, Jul 22, 2024 at 10:04 AM Salva Alcántara <salcantara...@gmail.com> > wrote: > >> The same happens with this slight variation: >> >> ``` >> Configuration config = new Configuration(); >> config.setString("collect-sink.batch-size.max", "100mb"); >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.configure(config); >> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new >> HashMapStateBackend()); >> ``` >> >> Salva >> >> On Mon, Jul 22, 2024 at 10:00 AM Salva Alcántara <salcantara...@gmail.com> >> wrote: >> >>> Hi Zhanghao, >>> >>> Thanks for your suggestion. Unfortunately, this does not work, I still >>> get the same error message: >>> >>> ``` >>> Record size is too large for CollectSinkFunction. Record size is 9623137 >>> bytes, but max bytes per batch is only 2097152 bytes. >>> Please consider increasing max bytes per batch value by setting >>> collect-sink.batch-size.max >>> ``` >>> >>> The code looks like this now: >>> >>> ``` >>> Configuration config = new Configuration(); >>> config.setString("collect-sink.batch-size.max", "10mb"); >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(config); >>> SavepointReader savepoint = SavepointReader.read(env, savepointPath, new >>> HashMapStateBackend()); >>> >>> var matcher = savepoint.readKeyedState("Raw Triggers", new >>> MatcherReader()); >>> var matcherState = matcher.executeAndCollect(1000); >>> ``` >>> >>> I have tried other ways but none has worked (the setting is always >>> ignored in the end). >>> >>> Regards, >>> >>> Salva >>> >>> >>> >>> On Sun, Jul 21, 2024 at 9:10 AM Zhanghao Chen <zhanghao.c...@outlook.com> >>> wrote: >>> >>>> Hi, you could increase it as follows: >>>> >>>> Configuration config = new Configuration(); >>>> config.setString(collect-sink.batch-size.max, "10mb"); >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(config); >>>> ------------------------------ >>>> *From:* Salva Alcántara <salcantara...@gmail.com> >>>> *Sent:* Saturday, July 20, 2024 15:05 >>>> *To:* user <user@flink.apache.org> >>>> *Subject:* SavepointReader: Record size is too large for >>>> CollectSinkFunction >>>> >>>> Hi all! >>>> >>>> I'm trying to debug a job via inspecting its savepoints but I'm getting >>>> this error message: >>>> >>>> ``` >>>> Caused by: java.lang.RuntimeException: Record size is too large for >>>> CollectSinkFunction. Record size is 9627127 bytes, but max bytes per batch >>>> is only 2097152 bytes. Please consider increasing max bytes per batch value >>>> by setting collect-sink.batch-size.max >>>> ``` >>>> >>>> My code looks like this: >>>> >>>> ``` >>>> private static void run(String savepointPath) throws Exception { >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> org.apache.flink.state.api.SavepointReader savepoint = >>>> org.apache.flink.state.api.SavepointReader.read(env, >>>> savepointPath, new HashMapStateBackend()); >>>> >>>> var operator = savepoint.readKeyedState("uuid", new >>>> MyKeyedOperatorReader()); >>>> var operatorState = matcher.executeAndCollect(1000); >>>> } >>>> ``` >>>> >>>> I haven't found the way to increase the `collect-sink.batch-size.max` >>>> as suggested in the error msg. >>>> >>>> Any help on this will be appreciated! >>>> >>>> Regards, >>>> >>>> Salva >>>> >>>