[ 
https://issues.apache.org/jira/browse/FLINK-17923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17118460#comment-17118460
 ] 

Xintong Song edited comment on FLINK-17923 at 5/28/20, 8:47 AM:
----------------------------------------------------------------

[~dian.fu], [~sunjincheng121]

I'm not familiar with the flink-python code base, thus I cannot speak much to 
the PR.

My only concern is regarding auto-magically setting task.off-heap.size for 
users. I wonder whether we are trying to be a bit over-smart. It might safe 
some user efforts in many cases, but could also make things hard to understand 
in other cases. It is one of the main motivations for FLIP-49 to make sure all 
the memory calculations happen at one place, without such kind of implicit 
logics.

I would suggest not to override the task.off-heap.size configuration. Instead, 
we can suggest how to set this configuration in both memory configuration and 
python udf docs. This is similar to RocksDBStateBackend, when managed memory is 
disabled users need to explicitly make sure enough native memory is reserved 
for RocksDB.

WDYT?


was (Author: xintongsong):
[~dian.fu][~sunjincheng121]

I'm not familiar with the flink-python code base, thus I cannot speak much to 
the PR.

My only concern is regarding auto-magically setting task.off-heap.size for 
users. I wonder whether we are trying to be a bit over-smart. It might safe 
some user efforts in many cases, but could also make things hard to understand 
in other cases. It is one of the main motivations for FLIP-49 to make sure all 
the memory calculations happen at one place, without such kind of implicit 
logics.

I would suggest not to override the task.off-heap.size configuration. Instead, 
we can suggest how to set this configuration in both memory configuration and 
python udf docs. This is similar to RocksDBStateBackend, when managed memory is 
disabled users need to explicitly make sure enough native memory is reserved 
for RocksDB.

WDYT?

> It will throw MemoryAllocationException if rocksdb statebackend and Python 
> UDF are used in the same slot  
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-17923
>                 URL: https://issues.apache.org/jira/browse/FLINK-17923
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python, Runtime / State Backends
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Dian Fu
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>
> For the following job:
> {code}
> import logging
> import os
> import shutil
> import sys
> import tempfile
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes
> from pyflink.table.udf import udf
> def word_count():
>     content = "line Licensed to the Apache Software Foundation ASF under one 
> " \
>               "line or more contributor license agreements See the NOTICE 
> file " \
>               "line distributed with this work for additional information " \
>               "line regarding copyright ownership The ASF licenses this file 
> " \
>               "to you under the Apache License Version the " \
>               "License you may not use this file except in compliance " \
>               "with the License"
>     t_config = TableConfig()
>     env = StreamExecutionEnvironment.get_execution_environment()
>     t_env = StreamTableEnvironment.create(env, t_config)
>     # register Results table in table environment
>     tmp_dir = tempfile.gettempdir()
>     result_path = tmp_dir + '/result'
>     if os.path.exists(result_path):
>         try:
>             if os.path.isfile(result_path):
>                 os.remove(result_path)
>             else:
>                 shutil.rmtree(result_path)
>         except OSError as e:
>             logging.error("Error removing directory: %s - %s.", e.filename, 
> e.strerror)
>     logging.info("Results directory: %s", result_path)
>     sink_ddl = """
>         create table Results(
>             word VARCHAR,
>             `count` BIGINT
>         ) with (
>             'connector' = 'blackhole'
>         )
>         """
>     t_env.sql_update(sink_ddl)
>     @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
>     def inc(count):
>         return count + 1
>     t_env.register_function("inc", inc)
>     elements = [(word, 1) for word in content.split(" ")]
>     t_env.from_elements(elements, ["word", "count"]) \
>          .group_by("word") \
>          .select("word, count(1) as count") \
>          .select("word, inc(count) as count") \
>          .insert_into("Results")
>     t_env.execute("word_count")
> if __name__ == '__main__':
>     logging.basicConfig(stream=sys.stdout, level=logging.INFO, 
> format="%(message)s")
>     word_count()
> {code}
> It will throw the following exception if rocksdb state backend is used:
> {code}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) 
> from any of the 1 provided restore options.
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144)
>       ... 9 more
> Caused by: java.io.IOException: Failed to acquire shared cache resource for 
> RocksDB
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516)
>       at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>       at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>       ... 11 more
> Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could 
> not created the shared memory resource of size 536870920. Not enough memory 
> left to reserve from the slot's managed memory.
>       at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603)
>       at 
> org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130)
>       at 
> org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566)
>       at 
> org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208)
>       ... 15 more
> Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could 
> not allocate 536870920 bytes. Only 454033416 bytes are remaining.
>       at 
> org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461)
>       at 
> org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601)
>       ... 20 more
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to