A Table can have at most one time attribute. In your Table the proc_time column is a processing time attribute, and when you define a watermark on the event_time column then that column becomes an event-time attribute.
If you want to combine event time and processing time, you can use the PROCTIME() function in your queries without having a processing time attribute as one of the columns in the table. Best, David On Wed, Jun 8, 2022 at 9:46 PM Benenson, Michael < mikhail_benen...@intuit.com> wrote: > Hi, folks > > > > *Short description*: > > > > I use Flink 1.15.0 sql-client and Java User Define Function in CREATE > TABLE … statement to get Timestamp. > > It works OK, if I do not use Timestamp in Watermark, but if used in > Watermark it causes > > java.lang.RuntimeException: Could not instantiate generated class > 'WatermarkGenerator$0' > > … > > Caused by: org.apache.flink.api.common.InvalidProgramException: Table > program cannot be compiled. This is a bug. Please file an issue. > > at org.apache.flink.table.runtime.generated.CompileUtils.doCompile( > CompileUtils.java:107) > > … > > Caused by: org.codehaus.commons.compiler.CompileException: Line 30, Column > 75: Cannot determine simple type name "org" > > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java: > 12211) > > > > > > *Details*: > > java -version > > openjdk version "11.0.14.1" 2022-02-08 LTS > > OpenJDK Runtime Environment Corretto-11.0.14.10.1 (build 11.0.14.1+10-LTS) > > OpenJDK 64-Bit Server VM Corretto-11.0.14.10.1 (build 11.0.14.1+10-LTS, > mixed mode) > > > > Flink 1.15.0, flink-1.15.0/bin/sql-client.sh > > > > SET 'sql-client.execution.result-mode' = 'tableau'; > > SET 'table.exec.sink.not-null-enforcer' = 'drop'; > > > > CREATE TEMPORARY FUNCTION default_catalog.default_database.fix_instant > > AS 'com.intuit.data.strmprocess.udf.FixInstant' LANGUAGE JAVA; > > > > CREATE OR REPLACE TABLE input ( > > event_header ROW(topic_name STRING), > > `timestamp` STRING NOT NULL, > > event_time AS TO_TIMESTAMP(fix_instant(`timestamp`), > 'yyyy-MM-dd''T''HH:mm:ss.SSS''Z'''), > > properties ROW(company_id STRING NOT NULL, scope_area STRING, > action STRING) NOT NULL, > > WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND, > > proc_time AS PROCTIME() > > ) WITH ( > > 'connector' = 'kafka', > > 'topic' = 'mb-1644796800-qbo', > > 'properties.bootstrap.servers' = 'localhost:9092', > > 'format' = 'json', > > 'scan.startup.mode' = 'latest-offset', > > 'json.ignore-parse-errors' = 'true', > > 'json.fail-on-missing-field' = 'false' > > ); > > > > SELECT `timestamp`, event_time, event_header.topic_name AS topic, > properties.company_id as company FROM input > > LIMIT 10 > > ; > > > > Works fine, if I comment WATERMARK FOR event_time … > Causes an error, if WATERMARK FOR event_time is used: > > 2022-06-08 12:16:32 > > java.lang.RuntimeException: Could not instantiate generated class > 'WatermarkGenerator$0' > > at org.apache.flink.table.runtime.generated.GeneratedClass > .newInstance(GeneratedClass.java:74) > > at org.apache.flink.table.runtime.generated. > GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator( > GeneratedWatermarkGeneratorSupplier.java:62) > > at org.apache.flink.streaming.api.operators.source. > ProgressiveTimestampsAndWatermarks.createMainOutput( > ProgressiveTimestampsAndWatermarks.java:104) > > at org.apache.flink.streaming.api.operators.SourceOperator > .initializeMainOutput(SourceOperator.java:426) > > at org.apache.flink.streaming.api.operators.SourceOperator > .emitNextNotReading(SourceOperator.java:402) > > at org.apache.flink.streaming.api.operators.SourceOperator.emitNext( > SourceOperator.java:387) > > at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput > .emitNext(StreamTaskSourceInput.java:68) > > at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor > .processInput(StreamOneInputProcessor.java:65) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput( > StreamTask.java:519) > > at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor > .runMailboxLoop(MailboxProcessor.java:203) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop( > StreamTask.java:804) > > at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke( > StreamTask.java:753) > > at org.apache.flink.runtime.taskmanager.Task > .runWithSystemExitMonitoring(Task.java:948) > > at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task > .java:927) > > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > > at java.base/java.lang.Thread.run(Thread.java:829) > > Caused by: org.apache.flink.util.FlinkRuntimeException: > org.apache.flink.api.common.InvalidProgramException: Table program cannot > be compiled. This is a bug. Please file an issue. > > at org.apache.flink.table.runtime.generated.CompileUtils.compile( > CompileUtils.java:94) > > at org.apache.flink.table.runtime.generated.GeneratedClass.compile( > GeneratedClass.java:101) > > at org.apache.flink.table.runtime.generated.GeneratedClass > .newInstance(GeneratedClass.java:68) > > ... 16 more > > Caused by: > org.apache.flink.shaded.guava30.com.google.common.util.concurrent. > UncheckedExecutionException: org.apache.flink.api.common. > InvalidProgramException: Table program cannot be compiled. This is a bug. > Please file an issue. > > at org.apache.flink.shaded.guava30.com.google.common.cache. > LocalCache$Segment.get(LocalCache.java:2051) > > at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache. > get(LocalCache.java:3962) > > at org.apache.flink.shaded.guava30.com.google.common.cache. > LocalCache$LocalManualCache.get(LocalCache.java:4859) > > at org.apache.flink.table.runtime.generated.CompileUtils.compile( > CompileUtils.java:92) > > ... 18 more > > Caused by: org.apache.flink.api.common.InvalidProgramException: Table > program cannot be compiled. This is a bug. Please file an issue. > > at org.apache.flink.table.runtime.generated.CompileUtils.doCompile( > CompileUtils.java:107) > > at org.apache.flink.table.runtime.generated.CompileUtils > .lambda$compile$0(CompileUtils.java:92) > > at org.apache.flink.shaded.guava30.com.google.common.cache. > LocalCache$LocalManualCache$1.load(LocalCache.java:4864) > > at org.apache.flink.shaded.guava30.com.google.common.cache. > LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529) > > at org.apache.flink.shaded.guava30.com.google.common.cache. > LocalCache$Segment.loadSync(LocalCache.java:2278) > > at org.apache.flink.shaded.guava30.com.google.common.cache. > LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155) > > at org.apache.flink.shaded.guava30.com.google.common.cache. > LocalCache$Segment.get(LocalCache.java:2045) > > ... 21 more > > Caused by: org.codehaus.commons.compiler.CompileException: Line 30, Column > 75: Cannot determine simple type name "org" > > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java: > 12211) > > at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler > .java:6833) > > at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler > .java:6594) > > at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler > .java:6607) > > at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler > .java:6607) > > at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler > .java:6607) > > at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler > .java:6607) > > at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler > .java:6607) > > at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler > .java:6607) > > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6573) > > at org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:215 > ) > > at org.codehaus.janino.UnitCompiler$22$1.visitReferenceType( > UnitCompiler.java:6481) > > at org.codehaus.janino.UnitCompiler$22$1.visitReferenceType( > UnitCompiler.java:6476) > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3928) > > at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java: > 6476) > > at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java: > 6469) > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3927) > > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469) > > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7121) > > at org.codehaus.janino.UnitCompiler.access$17000(UnitCompiler.java:215 > ) > > at org.codehaus.janino.UnitCompiler$22$2.visitNewClassInstance( > UnitCompiler.java:6529) > > at org.codehaus.janino.UnitCompiler$22$2.visitNewClassInstance( > UnitCompiler.java:6490) > > at org.codehaus.janino.Java$NewClassInstance.accept(Java.java:5190) > > at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java: > 6490) > > at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java: > 6469) > > at org.codehaus.janino.Java$Rvalue.accept(Java.java:4116) > > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469) > > at org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable( > UnitCompiler.java:9237) > > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9123 > ) > > at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9025 > ) > > at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5062 > ) > > at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) > > at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation( > UnitCompiler.java:4423) > > at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation( > UnitCompiler.java:4396) > > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > > at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396) > > at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java: > 5662) > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783) > > at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) > > at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation( > UnitCompiler.java:3762) > > at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation( > UnitCompiler.java:3734) > > at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073) > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734) > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) > > at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) > > at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement( > UnitCompiler.java:1494) > > at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement( > UnitCompiler.java:1487) > > at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874) > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) > > at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler > .java:1567) > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) > > at org.codehaus.janino.UnitCompiler.compileDeclaredMethods( > UnitCompiler.java:1357) > > at org.codehaus.janino.UnitCompiler.compileDeclaredMethods( > UnitCompiler.java:1330) > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) > > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) > > at org.codehaus.janino.UnitCompiler$2 > .visitPackageMemberClassDeclaration(UnitCompiler.java:411) > > at org.codehaus.janino.UnitCompiler$2 > .visitPackageMemberClassDeclaration(UnitCompiler.java:406) > > at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java > .java:1414) > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) > > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) > > at org.codehaus.janino.SimpleCompiler.compileToClassLoader( > SimpleCompiler.java:465) > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) > > at org.apache.flink.table.runtime.generated.CompileUtils.doCompile( > CompileUtils.java:104) > > ... 27 more > > > > > > Java code: > > > > package com.intuit.data.strmprocess.udf; > > import org.apache.flink.table.functions.ScalarFunction; > > > > > > > > > > > > > */** Usage in SQL: CREATE TEMPORARY FUNCTION > default_catalog.default_database.fix_instant AS > 'com.intuit.data.strmprocess.udf.FixInstant' LANGUAGE JAVA; CREATE OR > REPLACE TABLE input ( `timestamp` STRING NOT NULL, event_time AS > TO_TIMESTAMP(fix_instant(`timestamp`), 'yyyy-MM-dd''T''HH:mm:ss.SSS''Z'''), > */ *public class FixInstant extends ScalarFunction > { > private static final long *serialVersionUID *= -3115461377254640072L; > > public String eval(String s) { > if (s == null) return null; > else if (s.indexOf('.') > 0) return s; > else if (s.endsWith("Z")) return s.replace("Z", ".000Z"); > else return null; > } > } > > > > Compiled for Java 8 or Java 11, no differences > > > > Any ideas? > > >