Fanbin Bu created FLINK-15928:
---------------------------------

             Summary: Batch mode in blink planner caused 
IndexOutOfBoundsException error
                 Key: FLINK-15928
                 URL: https://issues.apache.org/jira/browse/FLINK-15928
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.9.2
            Reporter: Fanbin Bu


Flink version: 1.9.2

mode: Batch mode, running on EMR with YARN

The following is the details:

 

table source sample:

class SnowflakeTableSource(val schema: TableSchema,
 val parallelism: Int,
 val fetchSize: Int,
 val query: String,
 val options: SnowflakeOptions
 )
 extends StreamTableSource[Row] {

override def getDataStream(execEnv: StreamExecutionEnvironment): 
SingleOutputStreamOperator[Row] = {
 execEnv.createInput(getInputFormat, getReturnType).name("app_event_stream")
}

override def getReturnType: TypeInformation[Row] = schema.toRowType

override def getTableSchema: TableSchema = schema

override def isBounded: Boolean = true

private def getInputFormat: JDBCInputFormat = {
 JDBCInputFormat.buildJDBCInputFormat
 .setDrivername(options.driverName)
 .setDBUrl(options.dbUrl)
 .setUsername(options.username)
 .setPassword(options.password)
 .setQuery(query)
 .setRowTypeInfo(getInputRowTypeInfo)
 .setFetchSize(fetchSize)
 .setParametersProvider(new 
GenericParameterValuesProvider(buildQueryParams(parallelism)))
 .finish
}

}

 

Here is the sample setup code:

val settings = EnvironmentSettings.newInstance()
 .useBlinkPlanner()
 .inBatchMode()
 .build()

val tableEnv = TableEnvironment.create(settings)
val configurations = tableEnv.getConfig.getConfiguration

configurations.setString(
 TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY.key, 
s"${Globals.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY} mb")

tableEnv.registerTableSource(tableName, tableSource)

queryResult = tableEnv.sqlQuery(sql)

tableEnv.execute()

 

Here is the sample SQL:

select 
ip_address
 , hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts
 , sum(case when name = 'signin' then 1 else 0 end) as signin_count_1m
, sum(case when name = 'signin_failure' then 1 else 0 end) as 
signin_failure_count_1m

...
from events
group by
ip_address
 , hop(created_at, interval '30' second, interval '1' minute)

 

Here is the stacktrace:

java.lang.IndexOutOfBoundsException at 
org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) at 
org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) at 
HashWinAggWithKeys$538.endInput(Unknown Source) at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151)
 at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at 
java.lang.Thread.run(Thread.java:748)

 

The fact that this same code works well with other sql and the stacktrace 
message suggests that this might be related to memory issue. And this only 
happens for blink planner in batch mode. I tried to use BatchTableEnvironment 
in old planner and it works.

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to