Wrong results for join post tumble grouping

2020-11-02 Thread Satyam Shekhar
Hello,


I have a table T0 with the following schema -

root
  |-- amount: BIGINT}}
  |-- timestamp: TIMESTAMP(3)


The table T0 has two rows -
amount timestamp
0 0
1 8640


The following query with tumble grouping returns the wrong result -

WITH CTE AS
  (SELECT SUM(amount) AS _output,
  TUMBLE_END(`timestamp`, INTERVAL '1' SECOND) AS _dim0
   FROM T0 GROUP BY TUMBLE(`timestamp`, INTERVAL '1' SECOND))
SELECT V0._output as V0_output, V1._output AS V1_output,
   V0._dim0 as V0_time, V1._dim0 as V1_time
   FROM CTE as V0 INNER JOIN CTE V1 ON V0._dim0 = V1._dim0



The returned result is -
V0_output V1_output V0_time V1_time
1 1 86401000 86401000


The expected result is -
V0_output V1_output V0_time V1_time
0 0 1000 1000
1 1 86401000 86401000


Running subquery for `CTE` returns the correct result -

SELECT SUM(amount) AS _output,
   TUMBLE_END(`timestamp`, INTERVAL '1' SECOND) AS _dim0
FROM T0 GROUP BY TUMBLE(`timestamp`, INTERVAL '1' SECOND)


Result (this is correct) -
_output _dim0
0 1000
1 86401000
Also, the following query without tumble grouping returns the correct
result -
WITH CTE AS
  (SELECT amount AS _output, `timestamp` AS _dim0 FROM T0)
SELECT V0._output as V0_output, V1._output AS V1_output,
   V0._dim0 as V0_time, V1._dim0 as V1_time
   FROM CTE as V0 INNER JOIN CTE V1 ON V0._dim0 = V1._dim0

Result -
V0_output V1_output V0_time V1_time
0 0 0 0
1 1 8640 8640
I have filed a JIRA for the issue -
https://issues.apache.org/jira/browse/FLINK-19926#. Would love to get some
eyes on it.

Regards,
Satyam


Failure to execute streaming SQL query

2020-11-05 Thread Satyam Shekhar
Hello,

I have a table T0 with the following schema -

root
  |-- amount: BIGINT
  |-- timestamp: TIMESTAMP(3)

The following two queries fail execution on the above table when executed
in streaming mode using the Blink planner.

WITH A AS (
  SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as tm
FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm

WITH A AS (
  SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE)
as tm
FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm

The two queries are very similar and only differ in their use of tumble_end
and tumble_rowtime operator. Both queries use the timestamp column as their
rowtime attribute. Casting "tm" column to timestamp makes both queries work
-

WITH A AS (
  SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE)
as TIMESTAMP(3)) as tm
FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm

This workaround, however, loses the rowtime attribute from the output
resultset for the second query.

The first query fails with the following exception -

java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to
class java.lang.Long (java.sql.Timestamp is in module java.sql of loader
'platform'; java.lang.Long is in module java.base of loader 'bootstrap')
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at SinkConversion$166.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$163.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lan

Pushing Down Filters

2021-01-11 Thread Satyam Shekhar
Hello,

I am using Flink 1.11.2 as the execution engine for an alerting
application. Our application builds atop Flink's SQL API to run streaming
and batch jobs on a proprietary storage engine. We have a custom
StreamTableSource implementation that connects to our storage engine. The
connector currently implements the ProjectableTableSource interface. I now
wish to extend the connector to push down filters to the source for
improved performance. I have run into multiple issues in that effort -

1. Optimizer does not use both - ProjectableTableSource and
FilterableTableSource
- in a single query even if the source implements both interfaces. Each
interface works correctly if implemented independently.

2. Implementations of FilterableTableSource fail inside the optimizer for a
few TPC-DS queries in batch mode.

Stacktrace:

java.lang.AssertionError: OR(=($16, _UTF-16LE'Home'), OR(=($16,
_UTF-16LE'Books'), =($16, _UTF-16LE'Sports')))
   at org.apache.calcite.rel.core.Filter.(Filter.java:74)
   at
org.apache.calcite.rel.logical.LogicalFilter.(LogicalFilter.java:68)
   at
org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:126)
   at
org.apache.calcite.rel.logical.LogicalFilter.copy(LogicalFilter.java:45)
   at
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.pushFilterIntoScan(PushFilterIntoLegacyTableSourceScanRule.scala:130)
   at
org.apache.flink.table.planner.plan.rules.logical.PushFilterIntoLegacyTableSourceScanRule.onMatch(PushFilterIntoLegacyTableSourceScanRule.scala:77)
   at
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328)
   ...
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
   ...
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
   ...
   at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:86)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.org
$apache$flink$table$planner$plan$optimize$BatchCommonSubGraphBasedOptimizer$$optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:57)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer$$anonfun$doOptimize$1.apply(BatchCommonSubGraphBasedOptimizer.scala:45)
   at scala.collection.immutable.List.foreach(List.scala:392)
   at
org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
   at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
   at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
   ...
   at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)


Config:
  var settings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inBatchMode()
  .build();
  TableEnvironment.create(settings);

3. And finally, filter expressions containing the current timestamp (& now)
function are not resolved to constant values during predicate pushdown
optimizer. Let's take the following SQL query for example - select count(*)
from T0 where T0.C2 >= current_timestamp.  Here, applyPredicate method of
FilterableTableSource receives predicate as a CallExpression of  form
greaterThanOrEqual(C2,
currentTimestamp()). I'd have expected currentTimestamp to be resolved to a
constant value that is identitcal across all usages of currentTimestamp in
the query.

Regards,
Satyam


Time Temporal Join

2021-03-10 Thread Satyam Shekhar
Hello folks,

I am looking to enrich rows from an unbounded streaming table by joining it
with a bounded static table while preserving rowtime for the streaming
table. For example, let's consider table two tables F and D, where F is
unbounded and D is bounded. The schema for the two tables is the following -

F:
 |-- C0: BIGINT
 |-- C1: STRING
 |-- R: TIMESTAMP(3) **rowtime**
 |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS

D:
 |-- C0: BIGINT
 |-- C1: STRING NOT NULL

I'd like to run the following query on this schema -

select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
from F join D ON F.C1 = D.C1
group by D.C1, tumble(F.R, interval '1' second)

However, I run into the following error while running the above query -

"Rowtime attributes must not be in the input rows of a regular join. As a
workaround you can cast the time attributes of input tables to TIMESTAMP
before."

My understanding reading the docs is that Time Temporal Join is meant to
solve this problem. So I model table D as the following -

D:
 |-- C0: BIGINT
 |-- C1: STRING NOT NULL
 |-- R: TIMESTAMP(3)
 |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
 |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1)

With column D.R always set to 0 and modify the query as follows -

select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1
group by D.C1, tumble(F.R, interval '1' second)

The above query runs but does not return any result. I have the following
data in D initially -
Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
Emit D watermark=0

And F streams the following rows -
Emit F row=+I(0,"0",1970-01-01T00:00)@time=0
Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000
Emit F watermark=1000

I expect that two rows in F will join with matching rows (on C1) in D and
produce some output. But I do not see anything in the output.

So I have the following questions -

1. Is time temporal join the correct tool to solve this problem?
2. What could be the reason for not getting any output rows in the result?

Thanks,
Satyam


Re: Time Temporal Join

2021-03-15 Thread Satyam Shekhar
Hello folks,

I would love to hear back your feedback on this.

Regards,
Satyam

On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar 
wrote:

> Hello folks,
>
> I am looking to enrich rows from an unbounded streaming table by
> joining it with a bounded static table while preserving rowtime for the
> streaming table. For example, let's consider table two tables F and D,
> where F is unbounded and D is bounded. The schema for the two tables is the
> following -
>
> F:
>  |-- C0: BIGINT
>  |-- C1: STRING
>  |-- R: TIMESTAMP(3) **rowtime**
>  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>
> D:
>  |-- C0: BIGINT
>  |-- C1: STRING NOT NULL
>
> I'd like to run the following query on this schema -
>
> select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> from F join D ON F.C1 = D.C1
> group by D.C1, tumble(F.R, interval '1' second)
>
> However, I run into the following error while running the above query -
>
> "Rowtime attributes must not be in the input rows of a regular join. As a
> workaround you can cast the time attributes of input tables to TIMESTAMP
> before."
>
> My understanding reading the docs is that Time Temporal Join is meant to
> solve this problem. So I model table D as the following -
>
> D:
>  |-- C0: BIGINT
>  |-- C1: STRING NOT NULL
>  |-- R: TIMESTAMP(3)
>  |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
>  |-- CONSTRAINT 2da2dd2e-9937-48cb-9dec-4f6055713004 PRIMARY KEY (C1)
>
> With column D.R always set to 0 and modify the query as follows -
>
> select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> from F join D FOR SYSTEM_TIME AS OF F.R ON F.C1 = D.C1
> group by D.C1, tumble(F.R, interval '1' second)
>
> The above query runs but does not return any result. I have the following
> data in D initially -
> Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
> Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
> Emit D watermark=0
>
> And F streams the following rows -
> Emit F row=+I(0,"0",1970-01-01T00:00)@time=0
> Emit F row=+I(1,"1",1970-01-01T00:00:10)@time=1000
> Emit F watermark=1000
>
> I expect that two rows in F will join with matching rows (on C1) in D and
> produce some output. But I do not see anything in the output.
>
> So I have the following questions -
>
> 1. Is time temporal join the correct tool to solve this problem?
> 2. What could be the reason for not getting any output rows in the result?
>
> Thanks,
> Satyam
>
>


Re: Time Temporal Join

2021-03-25 Thread Satyam Shekhar
Hi Timo,

Apologies for the late response. I somehow seem to have missed your reply.

I do want the join to be "time-based" since I need to perform a tumble
grouping operation on top of the join.

I tried setting the watermark strategy to `R` - INTERVAL '0.001' SECONDS,
that didn't help either.

Note that we have a custom connector to an internal storage engine. The
connector implements ScanTableSource interface with
SupportsWatermarkPushDown ability. Would the watermark strategy in the
table schema matter in that case? I changed the query to the following to
simplify further -

select F.C0, F.C1, F.R, D.C0, D.C1, D.R from F JOIN D FOR SYSTEM_TIME AS OF
F.R ON F.C1 = D.C1

I still do not see any output from the pipeline. The overall logs I see
from the connecter is the following -

Emit D.D row=+I(0,0,1970-01-01T00:00)@time=0  -->
ctx.collectWithTimestamp(row_,
rowtime);
Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0
Emit D.D row=+I(1,1,1970-01-01T00:00)@time=0
Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(2,2,1970-01-01T00:00)@time=0
Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000
Emit D.F row=+I(3,3,1970-01-01T00:00:03)@time=3000
Emit D.D row=+I(3,3,1970-01-01T00:00)@time=0
Emit D.F row=+I(4,4,1970-01-01T00:00:04)@time=4000
Emit D.F wm=4000  --->  ctx.emitWatermark(new Watermark(wm));
Emit D.D wm=0

Now, if I change the rowtime of table D to 1s instead of 0, I get one row
as output.

Emit D.D row=+I(0,0,1970-01-01T00:00:01)@time=1000
Emit D.F row=+I(0,0,1970-01-01T00:00)@time=0
Emit D.F row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(1,1,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(2,2,1970-01-01T00:00:01)@time=1000
Emit D.D row=+I(3,3,1970-01-01T00:00:01)@time=1000
Emit D.F wm=1000
Emit D.D wm=1000

reply: (1, "1", 1000, 1, "1", 1000)

The next row streamed from F which should join with a row emitted from D
does not emit any output -

Emit D.F row=+I(2,2,1970-01-01T00:00:02)@time=2000
Emit D.F wm=2000
NO REPLY

My understanding of temporal joins is that the latest row from D should be
picked for joining rows from F.  Is my expectation that the (2, 2, 2s) in F
join with (2, 2, 1s) row in D wrong?

Regards,
Satyam


On Tue, Mar 16, 2021 at 5:54 AM Timo Walther  wrote:

> Hi Satyam,
>
> first of all your initial join query can also work, you just need to
> make sure that no time attribute is in the SELECT clause. As the
> exception indicates, you need to cast all time attributes to TIMESTAMP.
> The reason for this is some major design issue that is also explained
> here where a time attribute must not be in the output of a regular join:
>
> https://stackoverflow.com/a/64500296/806430
>
> However, since you would like to perform the join "time-based" either
> interval join or temporal join might solve your use cases.
>
> In your case I guess the watermark strategy of D is the problem. Are you
> sure the result is:
>
>  > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
>  > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
>  > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
>  > Emit D watermark=0
>
> and not:
>
>  > Emit D row=+I(0,"0",1970-01-01T00:00)@time=0
>  > Emit D watermark=0
>  > Emit D row=+I(1,"1",1970-01-01T00:00)@time=0
>  > Emit D row=+I(2,"2",1970-01-01T00:00)@time=0
>
> Or maybe the watermark is even dropped. Could you try to use a watermark
> strategy with
>
> `R` - INTERVAL '0.001' SECONDS
>
> I hope this helps.
>
> Regards,
> Timo
>
>
>
> On 16.03.21 04:37, Satyam Shekhar wrote:
> > Hello folks,
> >
> > I would love to hear back your feedback on this.
> >
> > Regards,
> > Satyam
> >
> > On Wed, Mar 10, 2021 at 6:53 PM Satyam Shekhar  > <mailto:satyamshek...@gmail.com>> wrote:
> >
> > Hello folks,
> >
> > I am looking to enrich rows from an unbounded streaming table by
> > joining it with a bounded static table while preserving rowtime for
> > the streaming table. For example, let's consider table two tables F
> > and D, where F is unbounded and D is bounded. The schema for the two
> > tables is the following -
> >
> > F:
> >   |-- C0: BIGINT
> >   |-- C1: STRING
> >   |-- R: TIMESTAMP(3) **rowtime**
> >   |-- WATERMARK FOR R: TIMESTAMP(3) AS `R` - INTERVAL '0' SECONDS
> >
> > D:
> >   |-- C0: BIGINT
> >   |-- C1: STRING NOT NULL
> >
> > I'd like to run the following query on this schema -
> >
> > select sum(F.C0), D.C1, tumble_start(F.R, interval '1' second)
> >  fr

Sorting Bounded Streams

2020-05-29 Thread Satyam Shekhar
Hello,

I am using Flink as the streaming execution engine for building a
low-latency alerting application. The use case also requires ad-hoc
querying on batch data, which I also plan to serve using Flink to avoid the
complexity of maintaining two separate engines.

My current understanding is that Order By operator in Blink planner (on
DataStream) requires time attribute as the primary sort column. This is
quite limiting for ad-hoc querying. It seems I can use the DataSet API to
obtain a globally sorted output on an arbitrary column but that will force
me to use the older Flink planner.

Specifically, I am looking for guidance from the community on the following
questions -

   1. Is it possible to obtain a globally sorted output on DataStreams on
   an arbitrary sort column?
   2. What are the tradeoffs in using DataSet vs DataStream in performance,
   long term support, etc?
   3. Is there any other way to address this issue?

Regards,
Satyam


Re: Sorting Bounded Streams

2020-05-30 Thread Satyam Shekhar
Thanks for your reply, Benchao Li.

While I can use the Blink planner in batch mode, I'd still have to work
with DataSet. Based on my limited reading it appears to me that DataStream
is being extended to support both batch and steaming use-cases with the
`isBounded` method in the StreamTableSource interface. Is that correct?

Is working with DataSet the recommended approach for the long term? Are
there any performance implications for that decision?

Regards,
Satyam


On Fri, May 29, 2020 at 9:01 PM Benchao Li  wrote:

> Hi Satyam,
>
> Are you using blink planner in streaming mode? AFAIK, blink planner in
> batch mode can sort on arbitrary columns.
>
> Satyam Shekhar  于2020年5月30日周六 上午6:19写道:
>
>> Hello,
>>
>> I am using Flink as the streaming execution engine for building a
>> low-latency alerting application. The use case also requires ad-hoc
>> querying on batch data, which I also plan to serve using Flink to avoid the
>> complexity of maintaining two separate engines.
>>
>> My current understanding is that Order By operator in Blink planner (on
>> DataStream) requires time attribute as the primary sort column. This is
>> quite limiting for ad-hoc querying. It seems I can use the DataSet API to
>> obtain a globally sorted output on an arbitrary column but that will force
>> me to use the older Flink planner.
>>
>> Specifically, I am looking for guidance from the community on the
>> following questions -
>>
>>1. Is it possible to obtain a globally sorted output on DataStreams
>>on an arbitrary sort column?
>>2. What are the tradeoffs in using DataSet vs DataStream in
>>performance, long term support, etc?
>>3. Is there any other way to address this issue?
>>
>> Regards,
>> Satyam
>>
>
>
> --
>
> Best,
> Benchao Li
>


Table Environment for Remote Execution

2020-06-02 Thread Satyam Shekhar
Hello,

I am running into a very basic problem while working with Table API. I wish
to create a TableEnvironment connected to a remote environment that uses
Blink planner in batch mode. Examples and documentation I have come across
so far recommend the following pattern to create such an environment -

var settings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inBatchMode()
  .build();
var tEnv = TableEnvironment.create(settings);

The above configuration, however, does not connect to a remote environment.
Tracing code in TableEnvironment.java, I see the following method in
BlinkExecutorFactory.java that appears to relevant -

Executor create(Map, StreamExecutionEnvironment);

However, it seems to be only accessible through the Scala bridge. I can't
seem to find a way to instantiate a TableEnvironment that takes
StreamExecutionEnvironment as an argument. How do I achieve that?

Regards,
Satyam


Re: Table Environment for Remote Execution

2020-06-03 Thread Satyam Shekhar
Thanks for the reply, Godfrey.

I would also love to learn the reasoning behind that limitation.

For more context, I am building a Java application that receives some user
input via a GRPC service. The user's input is translated to some SQL that
may be executed in streaming or batch mode based on custom business logic
and submitted it to Flink for execution. In my current setup, I create an
ExecutionEnvironment, register sources, and execute the corresponding SQL.
I was able to achieve the desired behavior with StreamTableEnvironment but
it has limitations around supported SQL in batch mode.

While invoking the CLI from java program might be a solution, it doesn't
feel like the most natural solution for the problem. Are there other ways
to address this?

Regards,
Satyam

On Wed, Jun 3, 2020 at 12:50 AM godfrey he  wrote:

> Hi Satyam,
>
> for blink batch mode, only TableEnvironment can be used,
> and TableEnvironment do not take StreamExecutionEnvironment as argument.
> Instead StreamExecutionEnvironment instance is created internally.
>
> back to your requirement, you can build your table program as user jar,
> and submit the job through flink cli [1] to remote environment.
>
> Bests,
> Godfrey
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
>
>
>
> Satyam Shekhar  于2020年6月3日周三 下午2:59写道:
>
>> Hello,
>>
>> I am running into a very basic problem while working with Table API. I
>> wish to create a TableEnvironment connected to a remote environment that
>> uses Blink planner in batch mode. Examples and documentation I have come
>> across so far recommend the following pattern to create such an environment
>> -
>>
>> var settings = EnvironmentSettings.newInstance()
>>   .useBlinkPlanner()
>>   .inBatchMode()
>>   .build();
>> var tEnv = TableEnvironment.create(settings);
>>
>> The above configuration, however, does not connect to a remote
>> environment. Tracing code in TableEnvironment.java, I see the following
>> method in BlinkExecutorFactory.java that appears to relevant -
>>
>> Executor create(Map, StreamExecutionEnvironment);
>>
>> However, it seems to be only accessible through the Scala bridge. I can't
>> seem to find a way to instantiate a TableEnvironment that takes
>> StreamExecutionEnvironment as an argument. How do I achieve that?
>>
>> Regards,
>> Satyam
>>
>


Re: Table Environment for Remote Execution

2020-06-03 Thread Satyam Shekhar
Thanks, Jark & Godfrey.

The workaround was successful.

I have created the following ticket to track the issue -
https://issues.apache.org/jira/browse/FLINK-18095

Regards,
Satyam

On Wed, Jun 3, 2020 at 3:26 AM Jark Wu  wrote:

> Hi Satyam,
>
> In the long term, TableEnvironment is the entry point for pure Table/SQL
> users. So it should have all the ability of StreamExecutionEnvironment.
> I think remote execution is a reasonable feature, could you create an JIRA
> issue for this?
>
> As a workaround, you can construct `StreamTableEnvironmentImpl` by
> yourself via constructor, it can support batch mode
> and StreamExecutionEnvironment.
>
> Best,
> Jark
>
>
> On Wed, 3 Jun 2020 at 16:35, Satyam Shekhar 
> wrote:
>
>> Thanks for the reply, Godfrey.
>>
>> I would also love to learn the reasoning behind that limitation.
>>
>> For more context, I am building a Java application that receives some
>> user input via a GRPC service. The user's input is translated to some SQL
>> that may be executed in streaming or batch mode based on custom business
>> logic and submitted it to Flink for execution. In my current setup, I
>> create an ExecutionEnvironment, register sources, and execute the
>> corresponding SQL. I was able to achieve the desired behavior with
>> StreamTableEnvironment but it has limitations around supported SQL in batch
>> mode.
>>
>> While invoking the CLI from java program might be a solution, it doesn't
>> feel like the most natural solution for the problem. Are there other ways
>> to address this?
>>
>> Regards,
>> Satyam
>>
>> On Wed, Jun 3, 2020 at 12:50 AM godfrey he  wrote:
>>
>>> Hi Satyam,
>>>
>>> for blink batch mode, only TableEnvironment can be used,
>>> and TableEnvironment do not take StreamExecutionEnvironment as argument.
>>> Instead StreamExecutionEnvironment instance is created internally.
>>>
>>> back to your requirement, you can build your table program as user jar,
>>> and submit the job through flink cli [1] to remote environment.
>>>
>>> Bests,
>>> Godfrey
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html
>>>
>>>
>>>
>>> Satyam Shekhar  于2020年6月3日周三 下午2:59写道:
>>>
>>>> Hello,
>>>>
>>>> I am running into a very basic problem while working with Table API. I
>>>> wish to create a TableEnvironment connected to a remote environment that
>>>> uses Blink planner in batch mode. Examples and documentation I have come
>>>> across so far recommend the following pattern to create such an environment
>>>> -
>>>>
>>>> var settings = EnvironmentSettings.newInstance()
>>>>   .useBlinkPlanner()
>>>>   .inBatchMode()
>>>>   .build();
>>>> var tEnv = TableEnvironment.create(settings);
>>>>
>>>> The above configuration, however, does not connect to a remote
>>>> environment. Tracing code in TableEnvironment.java, I see the
>>>> following method in BlinkExecutorFactory.java that appears to relevant
>>>> -
>>>>
>>>> Executor create(Map, StreamExecutionEnvironment);
>>>>
>>>> However, it seems to be only accessible through the Scala bridge. I
>>>> can't seem to find a way to instantiate a TableEnvironment that takes
>>>> StreamExecutionEnvironment as an argument. How do I achieve that?
>>>>
>>>> Regards,
>>>> Satyam
>>>>
>>>


UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Satyam Shekhar
Hello,

I am using Flink as the query engine to build an alerting/monitoring
application. One of the use cases in our product requires continuously
tracking and charting the output of an aggregate only SQL query,
for example, select sum(revenue) from lineorder. A desirable property from
the output of Flink job for such a query is that there is always exactly 1
row in the result set (or that the number of rows does not fall to 0 due to
retractions for previous output).  In other words, I need upsert "like"
semantics for the output of the query.

I was hopeful after reading comments in UpsertStreamTableSink.java that
this condition is accounted for in the implementation, however, a pipeline
with above query writing to a concrete UpsertStreamTableSink fails with the
following error  - "UpsertStreamTableSink requires that Table has" + " a
full primary keys if it is updated." Here are the relevant comments from
UpsertStreamTableSink.java for reference -

```
Configures the unique key fields of the {@link Table} to write. The method
is called after {@link TableSink#configure(String[], TypeInformation[])}.

The keys array might be empty, if the table consists of a single
(updated) record. If the table does not have a key and is append-only, the
keys attribute is null.

@param keys the field names of the table's keys, an empty array if the
table has a single row, and null if the table is append-only and has no key.
void setKeyFields(String[] keys);
```

The code in StreamExec(Legacy)Sink.scala appears to conform to observed
failure and does not match the comment about "empty key array if the table
consists of a single record".

 With that context, I have the following questions -

1. Is the UpsertStreamTableSink expected to consume the output of such
aggregate only queries? Or is my interpretation of the code and comment
wrong and I have misconfigured UpsertStreamTableSink?
2. If the answer to (1) is no, are there any recommended patterns for
solving this use-case such that the client never observes an empty result
set for the output of this query?

Regards,
Satyam


Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-05 Thread Satyam Shekhar
Hey Arvid,

Thanks for the reply.

As you suggested, rewriting the query to add a dummy output and group by
the clause - "select 1, sum(revenue) from lineorder group by 1" does add a
unique key column to the output, and the pipeline succeeds.

However, the application may get arbitrary SQL from the upstream server.
This makes the solution tricky - I'd have to change the query to add dummy
grouping key for all grouping nodes in the query and projection node to
drop the dummy key. I can try to account for this upstream (in query
generation layer) but it would prefer to have it solved within the
execution engine itself.

Regards,
Satyam

On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise  wrote:

> Hi Satyam,
>
> you are right, there seems to be a disconnect between javadoc and
> implementation. Jark probably knows more.
>
> In your case, couldn't you just add a dummy column containing a constant
> key?
>
> select 'revenue' AS name, sum(revenue) from lineorder
>
> and then set the dummy field as PK?
>
> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar 
> wrote:
>
>> Hello,
>>
>> I am using Flink as the query engine to build an alerting/monitoring
>> application. One of the use cases in our product requires continuously
>> tracking and charting the output of an aggregate only SQL query,
>> for example, select sum(revenue) from lineorder. A desirable property
>> from the output of Flink job for such a query is that there is always
>> exactly 1 row in the result set (or that the number of rows does not fall
>> to 0 due to retractions for previous output).  In other words, I need
>> upsert "like" semantics for the output of the query.
>>
>> I was hopeful after reading comments in UpsertStreamTableSink.java that
>> this condition is accounted for in the implementation, however, a pipeline
>> with above query writing to a concrete UpsertStreamTableSink fails with the
>> following error  - "UpsertStreamTableSink requires that Table has" + " a
>> full primary keys if it is updated." Here are the relevant comments from
>> UpsertStreamTableSink.java for reference -
>>
>> ```
>> Configures the unique key fields of the {@link Table} to write. The
>> method is called after {@link TableSink#configure(String[],
>> TypeInformation[])}.
>>
>> The keys array might be empty, if the table consists of a single
>> (updated) record. If the table does not have a key and is append-only, the
>> keys attribute is null.
>>
>> @param keys the field names of the table's keys, an empty array if the
>> table has a single row, and null if the table is append-only and has no key.
>> void setKeyFields(String[] keys);
>> ```
>>
>> The code in StreamExec(Legacy)Sink.scala appears to conform to observed
>> failure and does not match the comment about "empty key array if the table
>> consists of a single record".
>>
>>  With that context, I have the following questions -
>>
>> 1. Is the UpsertStreamTableSink expected to consume the output of such
>> aggregate only queries? Or is my interpretation of the code and comment
>> wrong and I have misconfigured UpsertStreamTableSink?
>> 2. If the answer to (1) is no, are there any recommended patterns for
>> solving this use-case such that the client never observes an empty result
>> set for the output of this query?
>>
>> Regards,
>> Satyam
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: UpsertStreamTableSink for Aggregate Only Query

2020-06-08 Thread Satyam Shekhar
Hi Jark,

I wish to atomically update the destination with remove-insert. To pick
that strategy, I need some "hint" from Flink that the output is a global
aggregation with no grouping key, and that appends should overwrite the
previous value.

I am also exploring handling the issue in the upstream server (in query
generation layer) where I have this knowledge based on the context (similar
to what Arvid suggested). I may be able to get around this problem by
handling it upstream.

Regards,
Satyam

On Sun, Jun 7, 2020 at 8:05 PM Jark Wu  wrote:

> Hi Satyam,
>
> Currently, `UpsertStreamTableSink` requires the query to contain a primary
> key, and the key will be set to `UpsertStreamTableSink#setKeyFields`.
> If there is no primary key in the query, an error will be thrown as you
> can see.
>
> It should work for all the group by queries (if no projection on the group
> by after the aggregation).
> Global aggregation is special, it doesn't have a primary key. But an
> upsert sink requires a primary key, otherwise it doesn't know which row to
> update.
> How would you write such a result into an external database if no primary
> key? Will you write them in append fashion, or remove-insert fashion?
>
> Best,
> Jark
>
>
> On Sat, 6 Jun 2020 at 04:32, Arvid Heise  wrote:
>
>> Instead of changing the query, I used to embed the query in a larger
>> context for similar works.
>>
>> So if you get an arbitrary query X which produces exactly one result
>> (e.g. X = select sum(revenue) from lineorder group by 1) then you can
>> craft a query where you add a dummy pk to the result.
>>
>> Table original = env.sqlQuery(X);
>> Table withDummy = original.select("'dummy' as pk, *');
>>
>> On Fri, Jun 5, 2020 at 9:59 PM Satyam Shekhar 
>> wrote:
>>
>>> Hey Arvid,
>>>
>>> Thanks for the reply.
>>>
>>> As you suggested, rewriting the query to add a dummy output and group by
>>> the clause - "select 1, sum(revenue) from lineorder group by 1" does
>>> add a unique key column to the output, and the pipeline succeeds.
>>>
>>> However, the application may get arbitrary SQL from the upstream server.
>>> This makes the solution tricky - I'd have to change the query to add dummy
>>> grouping key for all grouping nodes in the query and projection node to
>>> drop the dummy key. I can try to account for this upstream (in query
>>> generation layer) but it would prefer to have it solved within the
>>> execution engine itself.
>>>
>>> Regards,
>>> Satyam
>>>
>>> On Fri, Jun 5, 2020 at 11:59 AM Arvid Heise  wrote:
>>>
>>>> Hi Satyam,
>>>>
>>>> you are right, there seems to be a disconnect between javadoc and
>>>> implementation. Jark probably knows more.
>>>>
>>>> In your case, couldn't you just add a dummy column containing a
>>>> constant key?
>>>>
>>>> select 'revenue' AS name, sum(revenue) from lineorder
>>>>
>>>> and then set the dummy field as PK?
>>>>
>>>> On Fri, Jun 5, 2020 at 12:28 PM Satyam Shekhar 
>>>> wrote:
>>>>
>>>>> Hello,
>>>>>
>>>>> I am using Flink as the query engine to build an alerting/monitoring
>>>>> application. One of the use cases in our product requires continuously
>>>>> tracking and charting the output of an aggregate only SQL query,
>>>>> for example, select sum(revenue) from lineorder. A desirable property
>>>>> from the output of Flink job for such a query is that there is always
>>>>> exactly 1 row in the result set (or that the number of rows does not fall
>>>>> to 0 due to retractions for previous output).  In other words, I need
>>>>> upsert "like" semantics for the output of the query.
>>>>>
>>>>> I was hopeful after reading comments in UpsertStreamTableSink.java
>>>>> that this condition is accounted for in the implementation, however, a
>>>>> pipeline with above query writing to a concrete UpsertStreamTableSink 
>>>>> fails
>>>>> with the following error  - "UpsertStreamTableSink requires that
>>>>> Table has" + " a full primary keys if it is updated." Here are the
>>>>> relevant comments from UpsertStreamTableSink.java for reference -
>>>>>
>>>>> ```
>>>>> Configures the unique key fields of the {@l

Timestamp data type mismatch

2020-06-08 Thread Satyam Shekhar
Hello,

I am running into an issue while trying to create a TableSource with
rowtime attribute. I have configured TableSource to return produced
type of Row(DataTypes.BIGINT,
DataTypes.TIMESTAMP) via DataType TableSource::getProducedDataType(). The
returned DataStream has a flatmap operator that implements
ResultTypeQueryable and returns typeinfo RowTypeInfo({Types.LONG,
Types.SQL_TIMESTAMP}, {...}).

Queries on this table source fail with the following error -

TableSource of type io.netspring.blaze.eval.BlazeTableSource returned a
DataStream of data type Row(C0: Long, blaze_itime: Timestamp) that does not
match with the data type ROW<`C0` BIGINT, `blaze_itime` TIMESTAMP(6)>
declared by the TableSource.getProducedDataType() method. Please validate
the implementation of the TableSource.

Queries on DataStream without the timestamp column work. I was also able to
somewhat make it work with the timestamp column by changing the DataStream
to return Types.LOCAL_DATE_TIME. However, I am curious to know why
Types.TIMESTAMP does not match with DataTypes.TIMESTAMP.

Regards,
Satyam


Re: Timestamp data type mismatch

2020-06-08 Thread Satyam Shekhar
Thanks Dawid for the explanation.

Your suggested approach makes things work.

Regards,
Satyam

On Mon, Jun 8, 2020 at 1:18 AM Dawid Wysakowicz 
wrote:

> Hi Satyam,
>
> The thing is that Types.SQL_TIMESTAMP uses java.sql.Timestamp and
> serializes it as long (millis since epoch) and thus have milliseconds
> precision. The default precision for a DataTypes.TIMESTAMP is 6 and the
> default bridging class is LocalDataTime.
>
> It should work if you return
> DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class) in the
> getProducedDataType.
>
> Best,
>
> Dawid
> On 08/06/2020 10:08, Satyam Shekhar wrote:
>
> Hello,
>
> I am running into an issue while trying to create a TableSource with
> rowtime attribute. I have configured TableSource to return produced type of 
> Row(DataTypes.BIGINT,
> DataTypes.TIMESTAMP) via DataType TableSource::getProducedDataType(). The
> returned DataStream has a flatmap operator that implements
> ResultTypeQueryable and returns typeinfo RowTypeInfo({Types.LONG,
> Types.SQL_TIMESTAMP}, {...}).
>
> Queries on this table source fail with the following error -
>
> TableSource of type io.netspring.blaze.eval.BlazeTableSource returned a
> DataStream of data type Row(C0: Long, blaze_itime: Timestamp) that does not
> match with the data type ROW<`C0` BIGINT, `blaze_itime` TIMESTAMP(6)>
> declared by the TableSource.getProducedDataType() method. Please validate
> the implementation of the TableSource.
>
> Queries on DataStream without the timestamp column work. I was also able
> to somewhat make it work with the timestamp column by changing the
> DataStream to return Types.LOCAL_DATE_TIME. However, I am curious to know
> why Types.TIMESTAMP does not match with DataTypes.TIMESTAMP.
>
> Regards,
> Satyam
>
>


Error reporting for Flink jobs

2020-06-28 Thread Satyam Shekhar
Hello,

I am using Flink as the query engine for running SQL queries on both batch
and streaming data. I use the Blink planner in batch and streaming mode
respectively for the two cases.

In my current setup, I execute the batch queries synchronously via
StreamTableEnvironment::execute method. The job uses OutputFormat to
consume results in StreamTableSink and send it to the user. In case there
is an error/exception in the pipeline (possibly to user code), it is not
reported to OutputFormat or the Sink. If an error occurs after the
invocation of the write method on OutputFormat, the implementation may
falsely assume that the result successful and complete since close is
called in both success and failure cases. I can work around this, by
checking for exceptions thrown by the execute method but that adds extra
latency due to job tear down cost.

A similar problem also exists for streaming jobs. In my setup, streaming
jobs are executed asynchronously via StreamExecuteEnvironment::executeAsync.
Since the sink interface has no methods to receive errors in the pipeline,
the user code has to periodically track and manage persistent failures.

Have I missed something in the API? Or Is there some other way to get
access to error status in user code?

Regards,
Satyam


Colocating Compute

2020-07-29 Thread Satyam Shekhar
Hello,

I am using Flink v1.10 in a distributed environment to run SQL queries on
batch and streaming data.

In my setup, data is sharded and distributed across the cluster. Each shard
receives streaming updates from some external source. I wish to minimize
data movement during query evaluation for performance reasons. For that, I
need some construct to advise Flink planner to bind splits (shard) to the
host where it is located.

I have come across InputSplitAssigner which gives me levers to influence
compute colocation for batch queries. Is there a way to do the same for
streaming queries as well?

Regards,
Satyam


Re: Colocating Compute

2020-07-30 Thread Satyam Shekhar
Hi Dawid,

I am currently on Flink v1.10. Do streaming pipelines support unbounded
InputFormat in v1.10? My current setup uses SourceFunction for streaming
pipeline and InputFormat for batch queries.

I see the documentation for Flink v1.11 describe concepts for Split,
SourceReader, and SplitEnumerator to enable streaming queries on unbounded
splits. Is that the direction you were pointing to?

Regards,
Satyam

On Thu, Jul 30, 2020 at 6:03 AM Dawid Wysakowicz 
wrote:

> Hi Satyam,
>
> I think you can use the InputSplitAssigner also for streaming pipelines
> through an InputFormat. You can use
> StreamExecutionEnvironment#createInput or for SQL you can write your
> source according to the documentation here:
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sourceSinks.html#dynamic-table-source
>
> If you do not want to use an InputFormat I think there is no easy way to
> do it now.
>
> Best,
>
> Dawid
>
> On 29/07/2020 13:53, Satyam Shekhar wrote:
> > Hello,
> >
> > I am using Flink v1.10 in a distributed environment to run SQL queries
> > on batch and streaming data.
> >
> > In my setup, data is sharded and distributed across the cluster. Each
> > shard receives streaming updates from some external source. I wish to
> > minimize data movement during query evaluation for performance
> > reasons. For that, I need some construct to advise Flink planner to
> > bind splits (shard) to the host where it is located.
> >
> > I have come across InputSplitAssigner which gives me levers to
> > influence compute colocation for batch queries. Is there a way to do
> > the same for streaming queries as well?
> >
> > Regards,
> > Satyam
>
>


Editing Rowtime for SQL Table

2020-08-31 Thread Satyam Shekhar
Hello,

I use Flink for continuous evaluation of SQL queries on streaming data. One
of the use cases requires us to run recursive SQL queries. I am unable to
find a way to edit rowtime time attribute of the intermediate result table.

For example, let's assume that there is a table T0 with schema -
root
 |-- str1: STRING
 |-- int1: BIGINT
 |-- utime: TIMESTAMP(3)
 |-- itime: TIMESTAMP(3) *ROWTIME*

Now, let's create a view V0 -
var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0");

I wish to change the rowtime of V0 from itime to utime. I tried doing -

V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());

but ran into the following exception -

org.apache.flink.table.api.ValidationException: Window properties can only
be used on windowed tables.
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
~[flink-table-api-java-1.11.1.jar:1.11.1]
at
org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
~[flink-table-api-java-1.11.1.jar:1.11.1]

Any guidance on how to address this?

Regards,
Satyam


Re: Editing Rowtime for SQL Table

2020-09-01 Thread Satyam Shekhar
Thanks for your replies Matthias and Timo.

Converting the Table to DataStream, assigning a new Watermark & Rowtime
attribute, and converting back to Table makes sense. One challenge with
that approach is that Table to DataStream conversion could emit retractable
data stream, however, I think, that can now be handled with the new
TableSource API (in 1.11) that allows TableSource to emit retractions.

I'll try this approach when I migrate to the new API and report back.

Regards,
Satyam

On Tue, Sep 1, 2020 at 4:46 AM Timo Walther  wrote:

> Hi Satyam,
>
> Matthias is right. A rowtime attribute cannot be modified and needs to be
> passed "as is" through the pipeline. The only exceptions are if a newer
> rowtime is offered such as `TUMBLE_ROWTIME` or `MATCH_ROWTIME`. In your
> case, you need to define utime as the time attribute. If this is not
> possible, you either express the computation in regular SQL (with
> non-streaming optimizations) or you go to DataStream API prepare the table
> (assign new watermark and StreamRecord timestamp there) and go back to
> Table API.
>
> I hope this helps.
>
> Regards,
> Timo
>
> On Tue, Sep 1, 2020 at 11:40 AM Matthias Pohl 
> wrote:
>
>> Hi Satyam,
>> Thanks for your post. Unfortunately, it looks like you cannot change the
>> rowtime column here. The rowtime is strongly coupled with the Watermarks
>> feature. By changing the rowtime column we cannot ensure that the
>> watermarks are still aligned as Fabian mentioned in [1].
>>
>> @Timo Walther  : Could you verify my findings?
>>
>> Best,
>> Matthias
>>
>> [1]
>> https://stackoverflow.com/questions/52784089/flink-table-sql-api-modify-rowtime-attribute-after-session-window-aggregation
>>
>> On Mon, Aug 31, 2020 at 6:44 PM Satyam Shekhar 
>> wrote:
>>
>>> Hello,
>>>
>>> I use Flink for continuous evaluation of SQL queries on streaming data.
>>> One of the use cases requires us to run recursive SQL queries. I am unable
>>> to find a way to edit rowtime time attribute of the intermediate result
>>> table.
>>>
>>> For example, let's assume that there is a table T0 with schema -
>>> root
>>>  |-- str1: STRING
>>>  |-- int1: BIGINT
>>>  |-- utime: TIMESTAMP(3)
>>>  |-- itime: TIMESTAMP(3) *ROWTIME*
>>>
>>> Now, let's create a view V0 -
>>> var V0 = tEnv_.sqlQuery("select str1, int1, utime, itime from T0");
>>>
>>> I wish to change the rowtime of V0 from itime to utime. I tried doing -
>>>
>>> V0 = V0.addOrReplaceColumns($("utime").as("utime").rowtime());
>>>
>>> but ran into the following exception -
>>>
>>> org.apache.flink.table.api.ValidationException: Window properties can
>>> only be used on windowed tables.
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:854)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder$NoWindowPropertyChecker.visit(OperationTreeBuilder.java:843)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:39)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.expressions.UnresolvedCallExpression.accept(UnresolvedCallExpression.java:132)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.lambda$project$1(OperationTreeBuilder.java:158)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at java.base/java.util.ArrayList.forEach(ArrayList.java:1540) ~[na:na]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.project(OperationTreeBuilder.java:158)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.operations.utils.OperationTreeBuilder.addColumns(OperationTreeBuilder.java:207)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.api.internal.TableImpl.addColumnsOperation(TableImpl.java:475)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>> at
>>> org.apache.flink.table.api.internal.TableImpl.addOrReplaceColumns(TableImpl.java:459)
>>> ~[flink-table-api-java-1.11.1.jar:1.11.1]
>>>
>>> Any guidance on how to address this?
>>>
>>> Regards,
>>> Satyam
>>>
>>
>>
&

Backquote in SQL dialect

2020-09-17 Thread Satyam Shekhar
Hello,

I have been happily using Flink as the SQL engine for running streaming and
batch queries.

I am curious to understand the rationale behind Flink using backticks (`)
for quoting purposes instead of standard double quotes ("). Is double-quote
reserved for some other usage?

Regards,
Satyam