[ https://issues.apache.org/jira/browse/SPARK-29372?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16946036#comment-16946036 ]
Andrew Malone Melo commented on SPARK-29372: -------------------------------------------- Coincidentally enough, I just logged into Jira to report the same issue with 2.4.4 (also with a custom DSv2 implementation). The issue appears to be from the code enumerating each column individually in planNext(): {quote}/* 090 */ private void datasourcev2scan_nextBatch_0() throws java.io.IOException { /* 091 */ long getBatchStart = System.nanoTime(); /* 092 */ if (datasourcev2scan_mutableStateArray_0[0].hasNext()) { /* 093 */ datasourcev2scan_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)datasourcev2scan_mutableStateArray_0[0].next(); /* 094 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(datasourcev2scan_mutableStateArray_1[0].numRows()); /* 095 */ datasourcev2scan_batchIdx_0 = 0; /* 096 */ datasourcev2scan_mutableStateArray_2[0] =(org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(0); /* 097 */ datasourcev2scan_mutableStateArray_2[1] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(1); /* 098 */ datasourcev2scan_mutableStateArray_2[2] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(2);<snip>/* 993 */ datasourcev2scan_mutableStateArray_2[897] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(897); /* 994 */ datasourcev2scan_mutableStateArray_2[898] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(898); /* 995 */ datasourcev2scan_mutableStateArray_2[899] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(899); /* 996 */ datasourcev2scan_mutableStateArray_2[900] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(900); /* 997 */ datasourcev2scan_mutableStateArray_2[901] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(901); /* 998 */ datasourcev2scan_mutableStateArray_2[902] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(902); /* 999 */ datasourcev2scan_mutableStateArray_2[903] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(903); /* 1000 */ datasourcev2scan_mutableStateArray_2[904] = (org.apache.spark.sql.vectorized.ColumnVector) datasourcev2scan_mutableStateArray_1[0].column(904); /* 1001 */ [truncated to 1000 lines (total lines is 12533)] {quote} > Codegen grows beyond 64 KB for more columns in case of > SupportsScanColumnarBatch > -------------------------------------------------------------------------------- > > Key: SPARK-29372 > URL: https://issues.apache.org/jira/browse/SPARK-29372 > Project: Spark > Issue Type: Bug > Components: Spark Core, SQL > Affects Versions: 2.3.2 > Reporter: Shubham Chaurasia > Priority: Critical > > In case of vectorized DSv2 readers i.e. if it implements > {{SupportsScanColumnarBatch}} and number of columns is around(or greater > than) 1000 then it throws > {code:java} > Caused by: org.codehaus.janino.InternalCompilerException: Code of method > "processNext()V" of class > "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage0" > grows beyond 64 KB > at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:990) > at org.codehaus.janino.CodeContext.write(CodeContext.java:899) > at org.codehaus.janino.CodeContext.writeBranch(CodeContext.java:1016) > at org.codehaus.janino.UnitCompiler.writeBranch(UnitCompiler.java:11911) > at > org.codehaus.janino.UnitCompiler.compileBoolean2(UnitCompiler.java:3675) > at org.codehaus.janino.UnitCompiler.access$5500(UnitCompiler.java:212) > {code} > I can see from logs that it tries to disable Whole-stage codegen but it's > failing even after that on each retry. > {code} > 19/10/07 20:49:35 WARN WholeStageCodegenExec: Whole-stage codegen disabled > for plan (id=0): > *(0) DataSourceV2Scan [column_0#3558, column_1#3559, column_2#3560, > column_3#3561, column_4#3562, column_5#3563, column_6#3564, column_7#3565, > column_8#3566, column_9#3567, column_10#3568, column_11#3569, column_12#3570, > column_13#3571, column_14#3572, column_15#3573, column_16#3574, > column_17#3575, column_18#3576, column_19#3577, column_20#3578, > column_21#3579, column_22#3580, column_23#3581, ... 976 more fields], > com.shubham.reader.MyDataSourceReader@5c7673b8 > {code} > Repro code for a simple reader can be: > {code:java} > public class MyDataSourceReader implements DataSourceReader, > SupportsScanColumnarBatch { > private StructType schema; > private int numCols = 10; > private int numRows = 10; > private int numReaders = 1; > public MyDataSourceReader(Map<String, String> options) { > initOptions(options); > System.out.println("MyDataSourceReader.MyDataSourceReader: > Instantiated...." + this); > } > private void initOptions(Map<String, String> options) { > String numColumns = options.get("num_columns"); > if (numColumns != null) { > numCols = Integer.parseInt(numColumns); > } > String numRowsOption = options.get("num_rows_per_reader"); > if (numRowsOption != null) { > numRows = Integer.parseInt(numRowsOption); > } > String readersOption = options.get("num_readers"); > if (readersOption != null) { > numReaders = Integer.parseInt(readersOption); > } > } > @Override public StructType readSchema() { > final String colPrefix = "column_"; > StructField[] fields = new StructField[numCols]; > for (int i = 0; i < numCols; i++) { > fields[i] = new StructField(colPrefix + i, DataTypes.IntegerType, true, > Metadata.empty()); > } > schema = new StructType(fields); > return schema; > } > @Override public List<DataReaderFactory<ColumnarBatch>> > createBatchDataReaderFactories() { > System.out.println("MyDataSourceReader.createDataReaderFactories: " + > numReaders); > return new ArrayList<>(); > } > } > {code} > If I pass {{num_columns}} 1000 or greater, the issue appears. > {code:java} > spark.read.format("com.shubham.MyDataSource").option("num_columns", > "1000").option("num_rows_per_reader", 2).option("num_readers", 1).load.show > {code} > Any fixes/workarounds for this? > SPARK-16845 and SPARK-17092 are resolved but looks like they don't deal with > the vectorized part. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org