[ https://issues.apache.org/jira/browse/FLINK-19739?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17388395#comment-17388395 ]
Jingsong Lee commented on FLINK-19739: -------------------------------------- [~TsReaper] Can you cherry-pick to 1.13? > CompileException when windowing in batch mode: A method named "replace" is > not declared in any enclosing class nor any supertype > --------------------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-19739 > URL: https://issues.apache.org/jira/browse/FLINK-19739 > Project: Flink > Issue Type: Bug > Components: Table SQL / Runtime > Affects Versions: 1.11.2, 1.12.0 > Environment: Ubuntu 18.04 > Python 3.8, jar built from master yesterday. > Or Python 3.7, installed latest version from pip. > Reporter: Alex Hall > Assignee: Caizhi Weng > Priority: Minor > Labels: auto-deprioritized-major, auto-unassigned, > pull-request-available > > Example script: > {code:python} > from pyflink.table import EnvironmentSettings, BatchTableEnvironment > from pyflink.table.window import Tumble > env_settings = ( > > EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() > ) > table_env = BatchTableEnvironment.create(environment_settings=env_settings) > table_env.execute_sql( > """ > CREATE TABLE table1 ( > amount INT, > ts TIMESTAMP(3), > WATERMARK FOR ts AS ts - INTERVAL '5' SECOND > ) WITH ( > 'connector.type' = 'filesystem', > 'format.type' = 'csv', > 'connector.path' = '/home/alex/work/test-flink/data1.csv' > ) > """ > ) > table1 = table_env.from_path("table1") > table = ( > table1 > .window(Tumble.over("5.days").on("ts").alias("__window")) > .group_by("__window") > .select("amount.sum") > ) > print(table.to_pandas()) > {code} > Output: > {code} > WARNING: An illegal reflective access operation has occurred > WARNING: Illegal reflective access by > org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil > (file:/home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar) > to constructor java.nio.DirectByteBuffer(long,int) > WARNING: Please consider reporting this to the maintainers of > org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil > WARNING: Use --illegal-access=warn to enable warnings of further illegal > reflective access operations > WARNING: All illegal access operations will be denied in a future release > /* 1 */ > /* 2 */ public class LocalHashWinAggWithoutKeys$59 extends > org.apache.flink.table.runtime.operators.TableStreamOperator > /* 3 */ implements > org.apache.flink.streaming.api.operators.OneInputStreamOperator, > org.apache.flink.streaming.api.operators.BoundedOneInput { > /* 4 */ > /* 5 */ private final Object[] references; > /* 6 */ > /* 7 */ private static final org.slf4j.Logger LOG$2 = > /* 8 */ org.slf4j.LoggerFactory.getLogger("LocalHashWinAgg"); > /* 9 */ > /* 10 */ private transient > org.apache.flink.table.types.logical.LogicalType[] aggMapKeyTypes$5; > /* 11 */ private transient > org.apache.flink.table.types.logical.LogicalType[] aggBufferTypes$6; > /* 12 */ private transient > org.apache.flink.table.runtime.operators.aggregate.BytesHashMap > aggregateMap$7; > /* 13 */ org.apache.flink.table.data.binary.BinaryRowData > emptyAggBuffer$9 = new org.apache.flink.table.data.binary.BinaryRowData(1); > /* 14 */ org.apache.flink.table.data.writer.BinaryRowWriter > emptyAggBufferWriterTerm$10 = new > org.apache.flink.table.data.writer.BinaryRowWriter(emptyAggBuffer$9); > /* 15 */ org.apache.flink.table.data.GenericRowData hashAggOutput = > new org.apache.flink.table.data.GenericRowData(2); > /* 16 */ private transient > org.apache.flink.table.data.binary.BinaryRowData reuseAggMapKey$17 = new > org.apache.flink.table.data.binary.BinaryRowData(1); > /* 17 */ private transient > org.apache.flink.table.data.binary.BinaryRowData reuseAggBuffer$18 = new > org.apache.flink.table.data.binary.BinaryRowData(1); > /* 18 */ private transient > org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry > reuseAggMapEntry$19 = new > org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry(reuseAggMapKey$17, > reuseAggBuffer$18); > /* 19 */ org.apache.flink.table.data.binary.BinaryRowData aggMapKey$3 > = new org.apache.flink.table.data.binary.BinaryRowData(1); > /* 20 */ org.apache.flink.table.data.writer.BinaryRowWriter > aggMapKeyWriter$4 = new > org.apache.flink.table.data.writer.BinaryRowWriter(aggMapKey$3); > /* 21 */ private boolean hasInput = false; > /* 22 */ org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord((Object)null); > /* 23 */ private final > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); > /* 24 */ > /* 25 */ public LocalHashWinAggWithoutKeys$59( > /* 26 */ Object[] references, > /* 27 */ org.apache.flink.streaming.runtime.tasks.StreamTask task, > /* 28 */ org.apache.flink.streaming.api.graph.StreamConfig config, > /* 29 */ org.apache.flink.streaming.api.operators.Output output, > /* 30 */ > org.apache.flink.streaming.runtime.tasks.ProcessingTimeService > processingTimeService) throws Exception { > /* 31 */ this.references = references; > /* 32 */ aggMapKeyTypes$5 = > (((org.apache.flink.table.types.logical.LogicalType[]) references[0])); > /* 33 */ aggBufferTypes$6 = > (((org.apache.flink.table.types.logical.LogicalType[]) references[1])); > /* 34 */ this.setup(task, config, output); > /* 35 */ if (this instanceof > org.apache.flink.streaming.api.operators.AbstractStreamOperator) { > /* 36 */ > ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) > /* 37 */ .setProcessingTimeService(processingTimeService); > /* 38 */ } > /* 39 */ } > /* 40 */ > /* 41 */ @Override > /* 42 */ public void open() throws Exception { > /* 43 */ super.open(); > /* 44 */ aggregateMap$7 = new > org.apache.flink.table.runtime.operators.aggregate.BytesHashMap(this.getContainingTask(),this.getContainingTask().getEnvironment().getMemoryManager(),computeMemorySize(), > aggMapKeyTypes$5, aggBufferTypes$6); > /* 45 */ > /* 46 */ > /* 47 */ emptyAggBufferWriterTerm$10.reset(); > /* 48 */ > /* 49 */ > /* 50 */ if (true) { > /* 51 */ emptyAggBufferWriterTerm$10.setNullAt(0); > /* 52 */ } else { > /* 53 */ emptyAggBufferWriterTerm$10.writeInt(0, ((int) -1)); > /* 54 */ } > /* 55 */ > /* 56 */ emptyAggBufferWriterTerm$10.complete(); > /* 57 */ > /* 58 */ } > /* 59 */ > /* 60 */ @Override > /* 61 */ public void > processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord > element) throws Exception { > /* 62 */ org.apache.flink.table.data.RowData in1 = > (org.apache.flink.table.data.RowData) element.getValue(); > /* 63 */ > /* 64 */ org.apache.flink.table.data.binary.BinaryRowData > currentAggBuffer$8; > /* 65 */ int field$11; > /* 66 */ boolean isNull$11; > /* 67 */ int field$12; > /* 68 */ boolean isNull$12; > /* 69 */ boolean isNull$13; > /* 70 */ int result$14; > /* 71 */ > org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.LookupInfo > lookupInfo$20; > /* 72 */ org.apache.flink.table.data.TimestampData field$21; > /* 73 */ boolean isNull$21; > /* 74 */ boolean isNull$22; > /* 75 */ long result$23; > /* 76 */ boolean isNull$24; > /* 77 */ long result$25; > /* 78 */ boolean isNull$26; > /* 79 */ long result$27; > /* 80 */ boolean isNull$28; > /* 81 */ long result$29; > /* 82 */ boolean isNull$30; > /* 83 */ long result$31; > /* 84 */ boolean isNull$32; > /* 85 */ long result$33; > /* 86 */ boolean isNull$34; > /* 87 */ boolean result$35; > /* 88 */ boolean isNull$36; > /* 89 */ long result$37; > /* 90 */ boolean isNull$38; > /* 91 */ long result$39; > /* 92 */ boolean isNull$40; > /* 93 */ long result$41; > /* 94 */ boolean isNull$42; > /* 95 */ long result$43; > /* 96 */ boolean isNull$44; > /* 97 */ long result$45; > /* 98 */ boolean isNull$46; > /* 99 */ long result$47; > /* 100 */ boolean isNull$48; > /* 101 */ long result$49; > /* 102 */ boolean isNull$50; > /* 103 */ long result$51; > /* 104 */ boolean isNull$52; > /* 105 */ long result$53; > /* 106 */ boolean isNull$55; > /* 107 */ long result$56; > /* 108 */ boolean isNull$57; > /* 109 */ long result$58; > /* 110 */ > /* 111 */ > /* 112 */ if (!in1.isNullAt(1)) { > /* 113 */ hasInput = true; > /* 114 */ // input field access for group key projection, > window/pane assign > /* 115 */ // and aggregate map update > /* 116 */ isNull$11 = in1.isNullAt(0); > /* 117 */ field$11 = -1; > /* 118 */ if (!isNull$11) { > /* 119 */ field$11 = in1.getInt(0); > /* 120 */ } > /* 121 */ isNull$21 = in1.isNullAt(1); > /* 122 */ field$21 = null; > /* 123 */ if (!isNull$21) { > /* 124 */ field$21 = in1.getTimestamp(1, 3); > /* 125 */ } > /* 126 */ // assign timestamp(window or pane) > /* 127 */ > /* 128 */ > /* 129 */ > /* 130 */ > /* 131 */ > /* 132 */ isNull$22 = isNull$21; > /* 133 */ result$23 = -1L; > /* 134 */ if (!isNull$22) { > /* 135 */ > /* 136 */ result$23 = field$21.getMillisecond(); > /* 137 */ > /* 138 */ } > /* 139 */ > /* 140 */ > /* 141 */ isNull$24 = isNull$22 || false; > /* 142 */ result$25 = -1L; > /* 143 */ if (!isNull$24) { > /* 144 */ > /* 145 */ result$25 = (long) (result$23 * ((long) 1L)); > /* 146 */ > /* 147 */ } > /* 148 */ > /* 149 */ isNull$26 = isNull$21; > /* 150 */ result$27 = -1L; > /* 151 */ if (!isNull$26) { > /* 152 */ > /* 153 */ result$27 = field$21.getMillisecond(); > /* 154 */ > /* 155 */ } > /* 156 */ > /* 157 */ > /* 158 */ isNull$28 = isNull$26 || false; > /* 159 */ result$29 = -1L; > /* 160 */ if (!isNull$28) { > /* 161 */ > /* 162 */ result$29 = (long) (result$27 * ((long) 1L)); > /* 163 */ > /* 164 */ } > /* 165 */ > /* 166 */ > /* 167 */ isNull$30 = isNull$28 || false; > /* 168 */ result$31 = -1L; > /* 169 */ if (!isNull$30) { > /* 170 */ > /* 171 */ result$31 = (long) (result$29 - ((long) 0L)); > /* 172 */ > /* 173 */ } > /* 174 */ > /* 175 */ > /* 176 */ isNull$32 = isNull$30 || false; > /* 177 */ result$33 = -1L; > /* 178 */ if (!isNull$32) { > /* 179 */ > /* 180 */ result$33 = (long) (result$31 % ((long) 432000000L)); > /* 181 */ > /* 182 */ } > /* 183 */ > /* 184 */ > /* 185 */ isNull$34 = isNull$32 || false; > /* 186 */ result$35 = false; > /* 187 */ if (!isNull$34) { > /* 188 */ > /* 189 */ result$35 = result$33 < ((int) 0); > /* 190 */ > /* 191 */ } > /* 192 */ > /* 193 */ long result$54 = -1L; > /* 194 */ boolean isNull$54; > /* 195 */ if (result$35) { > /* 196 */ > /* 197 */ > /* 198 */ > /* 199 */ > /* 200 */ > /* 201 */ > /* 202 */ isNull$36 = isNull$21; > /* 203 */ result$37 = -1L; > /* 204 */ if (!isNull$36) { > /* 205 */ > /* 206 */ result$37 = field$21.getMillisecond(); > /* 207 */ > /* 208 */ } > /* 209 */ > /* 210 */ > /* 211 */ isNull$38 = isNull$36 || false; > /* 212 */ result$39 = -1L; > /* 213 */ if (!isNull$38) { > /* 214 */ > /* 215 */ result$39 = (long) (result$37 * ((long) 1L)); > /* 216 */ > /* 217 */ } > /* 218 */ > /* 219 */ > /* 220 */ isNull$40 = isNull$38 || false; > /* 221 */ result$41 = -1L; > /* 222 */ if (!isNull$40) { > /* 223 */ > /* 224 */ result$41 = (long) (result$39 - ((long) 0L)); > /* 225 */ > /* 226 */ } > /* 227 */ > /* 228 */ > /* 229 */ isNull$42 = isNull$40 || false; > /* 230 */ result$43 = -1L; > /* 231 */ if (!isNull$42) { > /* 232 */ > /* 233 */ result$43 = (long) (result$41 % ((long) 432000000L)); > /* 234 */ > /* 235 */ } > /* 236 */ > /* 237 */ > /* 238 */ isNull$44 = isNull$42 || false; > /* 239 */ result$45 = -1L; > /* 240 */ if (!isNull$44) { > /* 241 */ > /* 242 */ result$45 = (long) (result$43 + ((long) 432000000L)); > /* 243 */ > /* 244 */ } > /* 245 */ > /* 246 */ isNull$54 = isNull$44; > /* 247 */ if (!isNull$54) { > /* 248 */ result$54 = result$45; > /* 249 */ } > /* 250 */ } > /* 251 */ else { > /* 252 */ > /* 253 */ > /* 254 */ > /* 255 */ > /* 256 */ > /* 257 */ isNull$46 = isNull$21; > /* 258 */ result$47 = -1L; > /* 259 */ if (!isNull$46) { > /* 260 */ > /* 261 */ result$47 = field$21.getMillisecond(); > /* 262 */ > /* 263 */ } > /* 264 */ > /* 265 */ > /* 266 */ isNull$48 = isNull$46 || false; > /* 267 */ result$49 = -1L; > /* 268 */ if (!isNull$48) { > /* 269 */ > /* 270 */ result$49 = (long) (result$47 * ((long) 1L)); > /* 271 */ > /* 272 */ } > /* 273 */ > /* 274 */ > /* 275 */ isNull$50 = isNull$48 || false; > /* 276 */ result$51 = -1L; > /* 277 */ if (!isNull$50) { > /* 278 */ > /* 279 */ result$51 = (long) (result$49 - ((long) 0L)); > /* 280 */ > /* 281 */ } > /* 282 */ > /* 283 */ > /* 284 */ isNull$52 = isNull$50 || false; > /* 285 */ result$53 = -1L; > /* 286 */ if (!isNull$52) { > /* 287 */ > /* 288 */ result$53 = (long) (result$51 % ((long) 432000000L)); > /* 289 */ > /* 290 */ } > /* 291 */ > /* 292 */ isNull$54 = isNull$52; > /* 293 */ if (!isNull$54) { > /* 294 */ result$54 = result$53; > /* 295 */ } > /* 296 */ } > /* 297 */ isNull$55 = isNull$24 || isNull$54; > /* 298 */ result$56 = -1L; > /* 299 */ if (!isNull$55) { > /* 300 */ > /* 301 */ result$56 = (long) (result$25 - result$54); > /* 302 */ > /* 303 */ } > /* 304 */ > /* 305 */ > /* 306 */ isNull$57 = isNull$55 || false; > /* 307 */ result$58 = -1L; > /* 308 */ if (!isNull$57) { > /* 309 */ > /* 310 */ result$58 = (long) (result$56 - ((long) 0L)); > /* 311 */ > /* 312 */ } > /* 313 */ > /* 314 */ // process each input > /* 315 */ > /* 316 */ // build aggregate map key > /* 317 */ > /* 318 */ > /* 319 */ aggMapKeyWriter$4.reset(); > /* 320 */ > /* 321 */ > /* 322 */ if (false) { > /* 323 */ aggMapKeyWriter$4.setNullAt(0); > /* 324 */ } else { > /* 325 */ aggMapKeyWriter$4.writeLong(0, result$58); > /* 326 */ } > /* 327 */ > /* 328 */ aggMapKeyWriter$4.complete(); > /* 329 */ > /* 330 */ // aggregate by each input with assigned timestamp > /* 331 */ // look up output buffer using current key (grouping > keys ..., assigned timestamp) > /* 332 */ lookupInfo$20 = aggregateMap$7.lookup(aggMapKey$3); > /* 333 */ currentAggBuffer$8 = lookupInfo$20.getValue(); > /* 334 */ if (!lookupInfo$20.isFound()) { > /* 335 */ > /* 336 */ // append empty agg buffer into aggregate map for > current group key > /* 337 */ try { > /* 338 */ currentAggBuffer$8 = > /* 339 */ aggregateMap$7.append(lookupInfo$20, > emptyAggBuffer$9); > /* 340 */ } catch (java.io.EOFException exp) { > /* 341 */ > /* 342 */ LOG$2.info("BytesHashMap out of memory with {} entries, > output directly.", aggregateMap$7.getNumElements()); > /* 343 */ // hash map out of memory, output directly > /* 344 */ > /* 345 */ > org.apache.flink.util.MutableObjectIterator<org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry> > iterator = > /* 346 */ aggregateMap$7.getEntryIterator(); > /* 347 */ while (iterator.next(reuseAggMapEntry$19) != null) { > /* 348 */ > /* 349 */ > /* 350 */ > /* 351 */ hashAggOutput.replace(reuseAggMapKey$17, > reuseAggBuffer$18); > /* 352 */ > /* 353 */ output.collect(outElement.replace(hashAggOutput)); > /* 354 */ } > /* 355 */ > /* 356 */ // retry append > /* 357 */ > /* 358 */ // reset aggregate map retry append > /* 359 */ aggregateMap$7.reset(); > /* 360 */ lookupInfo$20 = aggregateMap$7.lookup(aggMapKey$3); > /* 361 */ try { > /* 362 */ currentAggBuffer$8 = > /* 363 */ aggregateMap$7.append(lookupInfo$20, emptyAggBuffer$9); > /* 364 */ } catch (java.io.EOFException e) { > /* 365 */ throw new OutOfMemoryError("BytesHashMap Out of > Memory."); > /* 366 */ } > /* 367 */ > /* 368 */ > /* 369 */ } > /* 370 */ } > /* 371 */ // aggregate buffer fields access > /* 372 */ isNull$12 = currentAggBuffer$8.isNullAt(0); > /* 373 */ field$12 = -1; > /* 374 */ if (!isNull$12) { > /* 375 */ field$12 = currentAggBuffer$8.getInt(0); > /* 376 */ } > /* 377 */ // do aggregate and update agg buffer > /* 378 */ int result$16 = -1; > /* 379 */ boolean isNull$16; > /* 380 */ if (isNull$11) { > /* 381 */ > /* 382 */ isNull$16 = isNull$12; > /* 383 */ if (!isNull$16) { > /* 384 */ result$16 = field$12; > /* 385 */ } > /* 386 */ } > /* 387 */ else { > /* 388 */ int result$15 = -1; > /* 389 */ boolean isNull$15; > /* 390 */ if (isNull$12) { > /* 391 */ > /* 392 */ isNull$15 = isNull$11; > /* 393 */ if (!isNull$15) { > /* 394 */ result$15 = field$11; > /* 395 */ } > /* 396 */ } > /* 397 */ else { > /* 398 */ > /* 399 */ > /* 400 */ > /* 401 */ isNull$13 = isNull$12 || isNull$11; > /* 402 */ result$14 = -1; > /* 403 */ if (!isNull$13) { > /* 404 */ > /* 405 */ result$14 = (int) (field$12 + field$11); > /* 406 */ > /* 407 */ } > /* 408 */ > /* 409 */ isNull$15 = isNull$13; > /* 410 */ if (!isNull$15) { > /* 411 */ result$15 = result$14; > /* 412 */ } > /* 413 */ } > /* 414 */ isNull$16 = isNull$15; > /* 415 */ if (!isNull$16) { > /* 416 */ result$16 = result$15; > /* 417 */ } > /* 418 */ } > /* 419 */ if (isNull$16) { > /* 420 */ currentAggBuffer$8.setNullAt(0); > /* 421 */ } else { > /* 422 */ currentAggBuffer$8.setInt(0, result$16); > /* 423 */ } > /* 424 */ > /* 425 */ } > /* 426 */ } > /* 427 */ > /* 428 */ > /* 429 */ @Override > /* 430 */ public void endInput() throws Exception { > /* 431 */ org.apache.flink.table.data.binary.BinaryRowData > currentAggBuffer$8; > /* 432 */ int field$11; > /* 433 */ boolean isNull$11; > /* 434 */ int field$12; > /* 435 */ boolean isNull$12; > /* 436 */ boolean isNull$13; > /* 437 */ int result$14; > /* 438 */ > org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.LookupInfo > lookupInfo$20; > /* 439 */ org.apache.flink.table.data.TimestampData field$21; > /* 440 */ boolean isNull$21; > /* 441 */ boolean isNull$22; > /* 442 */ long result$23; > /* 443 */ boolean isNull$24; > /* 444 */ long result$25; > /* 445 */ boolean isNull$26; > /* 446 */ long result$27; > /* 447 */ boolean isNull$28; > /* 448 */ long result$29; > /* 449 */ boolean isNull$30; > /* 450 */ long result$31; > /* 451 */ boolean isNull$32; > /* 452 */ long result$33; > /* 453 */ boolean isNull$34; > /* 454 */ boolean result$35; > /* 455 */ boolean isNull$36; > /* 456 */ long result$37; > /* 457 */ boolean isNull$38; > /* 458 */ long result$39; > /* 459 */ boolean isNull$40; > /* 460 */ long result$41; > /* 461 */ boolean isNull$42; > /* 462 */ long result$43; > /* 463 */ boolean isNull$44; > /* 464 */ long result$45; > /* 465 */ boolean isNull$46; > /* 466 */ long result$47; > /* 467 */ boolean isNull$48; > /* 468 */ long result$49; > /* 469 */ boolean isNull$50; > /* 470 */ long result$51; > /* 471 */ boolean isNull$52; > /* 472 */ long result$53; > /* 473 */ boolean isNull$55; > /* 474 */ long result$56; > /* 475 */ boolean isNull$57; > /* 476 */ long result$58; > /* 477 */ > /* 478 */ > org.apache.flink.util.MutableObjectIterator<org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry> > iterator = > /* 479 */ aggregateMap$7.getEntryIterator(); > /* 480 */ while (iterator.next(reuseAggMapEntry$19) != null) { > /* 481 */ > /* 482 */ > /* 483 */ > /* 484 */ hashAggOutput.replace(reuseAggMapKey$17, reuseAggBuffer$18); > /* 485 */ > /* 486 */ output.collect(outElement.replace(hashAggOutput)); > /* 487 */ } > /* 488 */ > /* 489 */ } > /* 490 */ > /* 491 */ > /* 492 */ @Override > /* 493 */ public void close() throws Exception { > /* 494 */ super.close(); > /* 495 */ aggregateMap$7.free(); > /* 496 */ > /* 497 */ } > /* 498 */ > /* 499 */ > /* 500 */ } > /* 501 */ > Traceback (most recent call last): > File "/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_903.py", > line 32, in <module> > print(table.to_pandas()) > File "/home/alex/work/flink/flink-python/pyflink/table/table.py", line 829, > in to_pandas > if batches.hasNext(): > File > "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py", > line 1285, in __call__ > return_value = get_return_value( > File "/home/alex/work/flink/flink-python/pyflink/util/exceptions.py", line > 147, in deco > return f(*a, **kw) > File > "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/protocol.py", > line 326, in get_return_value > raise Py4JJavaError( > py4j.protocol.Py4JJavaError: An error occurred while calling o51.hasNext. > : java.lang.RuntimeException: Failed to fetch next result > at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) > at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77) > at > org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115) > at > org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355) > at > org.apache.flink.table.runtime.arrow.ArrowUtils$1.hasNext(ArrowUtils.java:644) > at > org.apache.flink.table.runtime.arrow.ArrowUtils$2.hasNext(ArrowUtils.java:666) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at > org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: java.io.IOException: Failed to fetch job execution result > at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175) > at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126) > at > org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103) > ... 16 more > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.client.JobExecutionException: Job execution failed. > at > java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) > at > java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) > at > org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172) > ... 18 more > Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > execution failed. > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) > at > java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680) > at > java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658) > at > java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094) > at > org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) > ... 19 more > Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:217) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:210) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:204) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:526) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:413) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:566) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.lang.RuntimeException: Could not instantiate generated class > 'LocalHashWinAggWithoutKeys$59' > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) > at > org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) > at > org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:613) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:574) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) > at java.base/java.lang.Thread.run(Thread.java:834) > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) > at > org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) > at > org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65) > ... 13 more > Caused by: > org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot be > compiled. This is a bug. Please file an issue. > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) > at > org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) > ... 15 more > Caused by: org.apache.flink.api.common.InvalidProgramException: Table program > cannot be compiled. This is a bug. Please file an issue. > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) > at > org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) > at > org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) > ... 18 more > Caused by: org.codehaus.commons.compiler.CompileException: Line 351, Column > 33: A method named "replace" is not declared in any enclosing class nor any > supertype, nor through a static import > at > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997) > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) > at > org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) > at > org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781) > at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760) > at > org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732) > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) > at > org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553) > at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493) > at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487) > at org.codehaus.janino.Java$Block.accept(Java.java:2776) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1842) > at org.codehaus.janino.UnitCompiler.access$2200(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1498) > at > org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1487) > at org.codehaus.janino.Java$WhileStatement.accept(Java.java:3052) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553) > at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493) > at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487) > at org.codehaus.janino.Java$Block.accept(Java.java:2776) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileTryCatch(UnitCompiler.java:3136) > at > org.codehaus.janino.UnitCompiler.compileTryCatchFinally(UnitCompiler.java:2966) > at > org.codehaus.janino.UnitCompiler.compileTryCatchFinallyWithResources(UnitCompiler.java:2770) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2742) > at org.codehaus.janino.UnitCompiler.access$2300(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitTryStatement(UnitCompiler.java:1499) > at > org.codehaus.janino.UnitCompiler$6.visitTryStatement(UnitCompiler.java:1487) > at org.codehaus.janino.Java$TryStatement.accept(Java.java:3238) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553) > at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493) > at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487) > at org.codehaus.janino.Java$Block.accept(Java.java:2776) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2476) > at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495) > at > org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487) > at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553) > at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215) > at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493) > at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487) > at org.codehaus.janino.Java$Block.accept(Java.java:2776) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2476) > at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495) > at > org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487) > at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > at > org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) > at > org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) > at > org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) > at > org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > at > org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > at > org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) > ... 24 more > {code} > However it works fine in streaming mode: > {code:python} > env_settings = ( > > EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() > ) > table_env = StreamTableEnvironment.create(environment_settings=env_settings) > {code} > How the table is created seems irrelevant - this raises the same error: > {code:python} > from datetime import datetime > from pyflink.table import DataTypes, BatchTableEnvironment, > EnvironmentSettings > from pyflink.table.window import Tumble > env_settings = ( > > EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() > ) > table_environment = > BatchTableEnvironment.create(environment_settings=env_settings) > transactions = table_environment.from_elements( > [ > (1, datetime(2000, 1, 1, 0, 0, 0)), > (-2, datetime(2000, 1, 2, 0, 0, 0)), > (3, datetime(2000, 1, 3, 0, 0, 0)), > (-4, datetime(2000, 1, 4, 0, 0, 0)), > ], > DataTypes.ROW( > [ > DataTypes.FIELD("amount", DataTypes.BIGINT()), > DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), > ] > ), > ) > table = ( > transactions > .window(Tumble.over("5.days").on("ts").alias("__window")) > .group_by("__window") > .select("amount.sum") > ) > print(table.to_pandas()) > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)