A Table can have at most one time attribute. In your Table the proc_time
column is a processing time attribute, and when you define a watermark on
the event_time column then that column becomes an event-time attribute.

If you want to combine event time and processing time, you can use
the PROCTIME() function in your queries without having a processing time
attribute as one of the columns in the table.

Best,
David

On Wed, Jun 8, 2022 at 9:46 PM Benenson, Michael <
mikhail_benen...@intuit.com> wrote:

> Hi, folks
>
>
>
> *Short description*:
>
>
>
> I use Flink 1.15.0 sql-client and Java User Define Function in CREATE
> TABLE … statement to get Timestamp.
>
> It works OK, if I do not use Timestamp in Watermark, but if used in
> Watermark it causes
>
> java.lang.RuntimeException: Could not instantiate generated class
> 'WatermarkGenerator$0'
>
> …
>
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
>
>     at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(
> CompileUtils.java:107)
>
> …
>
> Caused by: org.codehaus.commons.compiler.CompileException: Line 30, Column
> 75: Cannot determine simple type name "org"
>
>     at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:
> 12211)
>
>
>
>
>
> *Details*:
>
> java -version
>
> openjdk version "11.0.14.1" 2022-02-08 LTS
>
> OpenJDK Runtime Environment Corretto-11.0.14.10.1 (build 11.0.14.1+10-LTS)
>
> OpenJDK 64-Bit Server VM Corretto-11.0.14.10.1 (build 11.0.14.1+10-LTS,
> mixed mode)
>
>
>
> Flink 1.15.0, flink-1.15.0/bin/sql-client.sh
>
>
>
> SET 'sql-client.execution.result-mode' = 'tableau';
>
> SET 'table.exec.sink.not-null-enforcer' = 'drop';
>
>
>
> CREATE TEMPORARY FUNCTION default_catalog.default_database.fix_instant
>
> AS 'com.intuit.data.strmprocess.udf.FixInstant' LANGUAGE JAVA;
>
>
>
> CREATE OR REPLACE TABLE input (
>
>           event_header ROW(topic_name STRING),
>
>           `timestamp` STRING NOT NULL,
>
>           event_time AS TO_TIMESTAMP(fix_instant(`timestamp`),
> 'yyyy-MM-dd''T''HH:mm:ss.SSS''Z'''),
>
>           properties ROW(company_id STRING NOT NULL, scope_area STRING,
> action STRING) NOT NULL,
>
>           WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND,
>
>           proc_time AS PROCTIME()
>
> ) WITH (
>
>             'connector' = 'kafka',
>
>             'topic' = 'mb-1644796800-qbo',
>
>             'properties.bootstrap.servers' = 'localhost:9092',
>
>             'format' = 'json',
>
>             'scan.startup.mode' = 'latest-offset',
>
>             'json.ignore-parse-errors' = 'true',
>
>             'json.fail-on-missing-field' = 'false'
>
> );
>
>
>
> SELECT `timestamp`, event_time, event_header.topic_name AS topic,
> properties.company_id as company FROM input
>
>    LIMIT 10
>
> ;
>
>
>
> Works fine, if I comment WATERMARK FOR event_time …
> Causes an error, if WATERMARK FOR event_time is used:
>
> 2022-06-08 12:16:32
>
> java.lang.RuntimeException: Could not instantiate generated class
> 'WatermarkGenerator$0'
>
>     at org.apache.flink.table.runtime.generated.GeneratedClass
> .newInstance(GeneratedClass.java:74)
>
>     at org.apache.flink.table.runtime.generated.
> GeneratedWatermarkGeneratorSupplier.createWatermarkGenerator(
> GeneratedWatermarkGeneratorSupplier.java:62)
>
>     at org.apache.flink.streaming.api.operators.source.
> ProgressiveTimestampsAndWatermarks.createMainOutput(
> ProgressiveTimestampsAndWatermarks.java:104)
>
>     at org.apache.flink.streaming.api.operators.SourceOperator
> .initializeMainOutput(SourceOperator.java:426)
>
>     at org.apache.flink.streaming.api.operators.SourceOperator
> .emitNextNotReading(SourceOperator.java:402)
>
>     at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(
> SourceOperator.java:387)
>
>     at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput
> .emitNext(StreamTaskSourceInput.java:68)
>
>     at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor
> .processInput(StreamOneInputProcessor.java:65)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(
> StreamTask.java:519)
>
>     at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor
> .runMailboxLoop(MailboxProcessor.java:203)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(
> StreamTask.java:804)
>
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:753)
>
>     at org.apache.flink.runtime.taskmanager.Task
> .runWithSystemExitMonitoring(Task.java:948)
>
>     at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task
> .java:927)
>
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>
>     at java.base/java.lang.Thread.run(Thread.java:829)
>
> Caused by: org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
>
>     at org.apache.flink.table.runtime.generated.CompileUtils.compile(
> CompileUtils.java:94)
>
>     at org.apache.flink.table.runtime.generated.GeneratedClass.compile(
> GeneratedClass.java:101)
>
>     at org.apache.flink.table.runtime.generated.GeneratedClass
> .newInstance(GeneratedClass.java:68)
>
>     ... 16 more
>
> Caused by:
> org.apache.flink.shaded.guava30.com.google.common.util.concurrent.
> UncheckedExecutionException: org.apache.flink.api.common.
> InvalidProgramException: Table program cannot be compiled. This is a bug.
> Please file an issue.
>
>     at org.apache.flink.shaded.guava30.com.google.common.cache.
> LocalCache$Segment.get(LocalCache.java:2051)
>
>     at org.apache.flink.shaded.guava30.com.google.common.cache.LocalCache.
> get(LocalCache.java:3962)
>
>     at org.apache.flink.shaded.guava30.com.google.common.cache.
> LocalCache$LocalManualCache.get(LocalCache.java:4859)
>
>     at org.apache.flink.table.runtime.generated.CompileUtils.compile(
> CompileUtils.java:92)
>
>     ... 18 more
>
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
>
>     at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(
> CompileUtils.java:107)
>
>     at org.apache.flink.table.runtime.generated.CompileUtils
> .lambda$compile$0(CompileUtils.java:92)
>
>     at org.apache.flink.shaded.guava30.com.google.common.cache.
> LocalCache$LocalManualCache$1.load(LocalCache.java:4864)
>
>     at org.apache.flink.shaded.guava30.com.google.common.cache.
> LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3529)
>
>     at org.apache.flink.shaded.guava30.com.google.common.cache.
> LocalCache$Segment.loadSync(LocalCache.java:2278)
>
>     at org.apache.flink.shaded.guava30.com.google.common.cache.
> LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2155)
>
>     at org.apache.flink.shaded.guava30.com.google.common.cache.
> LocalCache$Segment.get(LocalCache.java:2045)
>
>     ... 21 more
>
> Caused by: org.codehaus.commons.compiler.CompileException: Line 30, Column
> 75: Cannot determine simple type name "org"
>
>     at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:
> 12211)
>
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler
> .java:6833)
>
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler
> .java:6594)
>
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler
> .java:6607)
>
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler
> .java:6607)
>
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler
> .java:6607)
>
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler
> .java:6607)
>
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler
> .java:6607)
>
>     at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler
> .java:6607)
>
>     at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6573)
>
>     at org.codehaus.janino.UnitCompiler.access$13900(UnitCompiler.java:215
> )
>
>     at org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(
> UnitCompiler.java:6481)
>
>     at org.codehaus.janino.UnitCompiler$22$1.visitReferenceType(
> UnitCompiler.java:6476)
>
>     at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3928)
>
>     at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:
> 6476)
>
>     at org.codehaus.janino.UnitCompiler$22.visitType(UnitCompiler.java:
> 6469)
>
>     at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3927)
>
>     at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469)
>
>     at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:7121)
>
>     at org.codehaus.janino.UnitCompiler.access$17000(UnitCompiler.java:215
> )
>
>     at org.codehaus.janino.UnitCompiler$22$2.visitNewClassInstance(
> UnitCompiler.java:6529)
>
>     at org.codehaus.janino.UnitCompiler$22$2.visitNewClassInstance(
> UnitCompiler.java:6490)
>
>     at org.codehaus.janino.Java$NewClassInstance.accept(Java.java:5190)
>
>     at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:
> 6490)
>
>     at org.codehaus.janino.UnitCompiler$22.visitRvalue(UnitCompiler.java:
> 6469)
>
>     at org.codehaus.janino.Java$Rvalue.accept(Java.java:4116)
>
>     at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6469)
>
>     at org.codehaus.janino.UnitCompiler.findMostSpecificIInvocable(
> UnitCompiler.java:9237)
>
>     at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9123
> )
>
>     at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:9025
> )
>
>     at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5062
> )
>
>     at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>
>     at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(
> UnitCompiler.java:4423)
>
>     at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(
> UnitCompiler.java:4396)
>
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
>
>     at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4396)
>
>     at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:
> 5662)
>
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3783)
>
>     at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)
>
>     at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(
> UnitCompiler.java:3762)
>
>     at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(
> UnitCompiler.java:3734)
>
>     at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5073)
>
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3734)
>
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)
>
>     at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)
>
>     at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(
> UnitCompiler.java:1494)
>
>     at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(
> UnitCompiler.java:1487)
>
>     at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2874)
>
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)
>
>     at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler
> .java:1567)
>
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)
>
>     at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(
> UnitCompiler.java:1357)
>
>     at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(
> UnitCompiler.java:1330)
>
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)
>
>     at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
>
>     at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
>
>     at org.codehaus.janino.UnitCompiler$2
> .visitPackageMemberClassDeclaration(UnitCompiler.java:411)
>
>     at org.codehaus.janino.UnitCompiler$2
> .visitPackageMemberClassDeclaration(UnitCompiler.java:406)
>
>     at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java
> .java:1414)
>
>     at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
>
>     at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
>
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
>
>     at org.codehaus.janino.SimpleCompiler.compileToClassLoader(
> SimpleCompiler.java:465)
>
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
>
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
>
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
>
>     at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(
> CompileUtils.java:104)
>
>     ... 27 more
>
>
>
>
>
> Java code:
>
>
>
> package com.intuit.data.strmprocess.udf;
>
> import org.apache.flink.table.functions.ScalarFunction;
>
>
>
>
>
>
>
>
>
>
>
>
> */** Usage in SQL: CREATE TEMPORARY FUNCTION
> default_catalog.default_database.fix_instant AS
> 'com.intuit.data.strmprocess.udf.FixInstant' LANGUAGE JAVA; CREATE OR
> REPLACE TABLE input (     `timestamp` STRING NOT NULL,     event_time AS
> TO_TIMESTAMP(fix_instant(`timestamp`), 'yyyy-MM-dd''T''HH:mm:ss.SSS''Z'''),
> */ *public class FixInstant extends ScalarFunction
> {
>     private static final long *serialVersionUID *= -3115461377254640072L;
>
>     public String eval(String s) {
>         if (s == null) return null;
>         else if (s.indexOf('.') > 0) return s;
>         else if (s.endsWith("Z")) return s.replace("Z", ".000Z");
>         else return null;
>     }
> }
>
>
>
> Compiled for Java 8 or Java 11, no differences
>
>
>
> Any ideas?
>
>
>

Reply via email to