Hi Jon,
I would not recommend to use the configuration parameter. It is not
deprecated yet but can be considered legacy code from before we reworked
the type system.
Regards,
Timo
On 16.10.20 13:23, Kurt Young wrote:
Yes, I think this is a bug, feel free to open a jira and a pull request.
Hi Manas,
you need to make sure to differentiate between what Flink calls
"pre-flight phase" and "cluster phase".
The pre-flight phase is were the pipeline is constructed and all
functions are instantiated. They are then later serialized and send to
the cluster.
If you are reading your pro
Hi,
thanks for letting us know about this shortcoming.
I will link someone from the runtime team in the JIRA issue. Let's
continue the discussion there.
Regards,
Timo
On 22.10.20 05:36, chenkaibit wrote:
Hi everyone:
I met this Exception when a hard disk was damaged:
https://issues.apache
Hi Theo,
this is indeed a difficult use case. The KafkaDeserializationSchema is
actually meant mostly for deserialization and should not contain more
complex logic such as joining with a different topic. You would make
KafkaDeserializationSchema stateful.
But in your usecase, I see no better
CONFIG_TOPIC = s.get("CONFIG_TOPIC");
CONFIG_KAFKA = s.get("CONFIG_KAFKA");
}
}
This produces the same issue. With the easier solution that you
listed, are you implying I use multiple instances or a singleton
pattern of some sort?
On Thu, Oct 22, 2020 at 1:2
Hi,
sorry for the late reply. I the problem was in the
`tEnv.toAppendStream(result,Order.class).print();` right?
You can also find a new example here:
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/Gett
-flight and cluster phases? I
couldn't find anything on ci.apache.org/projects/flink
<http://ci.apache.org/projects/flink> and I think this behaviour should
be documented as a warning/note.
On Thu, Oct 22, 2020 at 6:44 PM Timo Walther <mailto:twal...@apache.org>> wrote:
Hi Rex,
sorry for the late reply. POJOs will have much better support in the
upcoming Flink versions because they have been fully integrated with the
new table type system mentioned in FLIP-37 [1] (e.g. support for
immutable POJOs and nested DataTypeHints etc).
For queries, scalar, and table
Hi Yuval,
this error is indeed weird.
@Aljoscha: I think Calcite uses apiguardian.
When I saw the initial error, it looked like there are different Apache
Calcite versions in the classpath. I'm wondering if this is a pure SBT
issue because I'm sure that other users would have reported this er
n the next version with this change might
release?
On Wed, Nov 4, 2020 at 2:44 AM Timo Walther <mailto:twal...@apache.org>> wrote:
Hi Rex,
sorry for the late reply. POJOs will have much better support in the
upcoming Flink versions because they have been fully integrate
gular one?
Thanks
On Fri, Oct 9, 2020 at 7:55 AM Timo Walther <mailto:twal...@apache.org>> wrote:
Hi Rex,
let me copy paste my answer from a similar thread 2 months ago:
Hi,
this might be helpful as well:
https://lists.apache.o
Hi Rex,
as far as I know, the IN operator only works on tables or a list of
literals where the latter one is just a shortcut for multiple OR
operations. I would just go with a UDF for this case. In SQL you could
do an UNNEST to convert the array into a table and then use the IN
operator. But
Hi Ori,
we might support SQL expressions soon in Table API. However, we might
not support aggregate functions immediately. I would recommend to use
`sqlQuery` for now.
The following is supported:
val table = tenv.fromDataStream(stream)
val newTable = tenv.sqlQuery(s"SELECT ... FROM $table")
Sorry for jumping in so late. I think Dawid gave a nice summary.
As he said, integration of the DataStream <> Table integration is still
under development. Until then I would suggest to option 3) which means
don't upgrade the functions and use the old registration function
`registerFunction`.
Hi Ruben,
by looking at the code, it seems you should be able to do that. At least
for batch workloads we are using
org.apache.flink.formats.csv.CsvFileSystemFormatFactory.CsvInputFormat
which is a FileInputFormat that supports the mentioned configuration option.
The problem is that this mig
Hi Felipe,
with non-deterministic Jark meant that you never know if the mini batch
timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
execution. This depends how fast records arrive at the operator.
In general, processing time can be considered non-deterministic, because
1
Hi,
unfortunately, we currently don't provide any upgrading guarantees for
SQL. In theory we could add a possibility to add operator uids, however,
this will not help much because the underlying SQL operators or better
optimization rules that create a smarter pipeline could change the
entire
Hi,
are you using the SQL jars or do you build the dependency jar file
yourself? It might be the case that the SQL jar for Kafka does not
include this module as the exception indicates. You might need to build
a custom Kafka jar with Maven and all dependencies you need. (including
correct MET
Hi Rex,
the classes mentioned in the documentation such as `int` and
`java.lang.Integer` are only used when you leave the SQL world to a UDF
or to a Java implementation in a sink.
But as a SQL user you only need to pay attention to the logical data
type. Those must match entirely or be a sup
Hi Fuyao,
sorry for not replying earlier.
You posted a lot of questions. I scanned the thread quickly, let me try
to answer some of them and feel free to ask further questions afterwards.
"is it possible to configure the parallelism for Table operation at
operator level"
No this is not pos
Hi Simone,
if you are just executing DataStream pipelines locally in your IDE while
prototyping. You should be able to use `DataStream#print()` which just
prints to standard out [1] (It might be hidden between the log messages).
For debugging locally, you can also just set breakpoints in your
Hi Klemens,
what you are observing are reasons why event-time should be preferred
over processing-time. Event-time uses the timestamp of your data while
processing-time is to basic for many use cases. Esp. when you want to
reprocess historic data, you want to do that at full speed instead of
Hi Saksham,
could you tell us a bit more about your deployement where you run Flink.
This seems to be the root exception:
2020-11-24 11:11:16,296 ERROR
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerStdoutFileHandler
[] - Failed to transfer file from TaskExecutor
f0dc0ae680e65a
integrate it with an Apache Kafka Service. Output is written to a
Postgres-Database-System.
I'll have a look at your proposal and let you know if it worked, after
having finished a few prerequisite parts.
Regards
Klemens
Am 24.11.20 um 12:59 schrieb Timo Walther:
Hi Klemens,
wha
I agree with Dawid.
Maybe one thing to add is that reusing parts of the pipeline is possible
via StatementSets in TableEnvironment. They allow you to add multiple
queries that consume from a common part of the pipeline (for example a
common source). But all of that is compiled into one big job
Hi,
one advice I can give you is to checkout the code and execute some of
the examples in debugging mode. Esp. within Flink's functions e.g.
MapFunction or ProcessFunction you can set a breakpoint and look at the
stack trace. This gives you a good overview about the Flink stack in
general.
ic-watermarkgenerator
Best,
Fuyao
On 11/20/20 08:55, Timo Walther wrote:
Hi Fuyao,
sorry for not replying earlier.
You posted a lot of questions. I scanned the thread quickly, let me
try to answer some of them and feel free to ask further questions
afterwards.
"is it possible to con
/dev/table/sql/queries.html#joins
<https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/queries.html#joins>)
Thanks!
Fuyao
On Tue, Nov 24, 2020 at 9:06 AM Timo Walther <mailto:twal...@apache.org>> wrote:
Hi Fuyao,
great that you could make progre
Hi,
first of all we don't support ListTypeInfo in Table API. Therefore, it
is treated as a RAW type. The exception during exception creation is a
bug that should be fixed in future version. But the mismatch is valid:
ARRAY is not a list type info but `Types.OBJECT_ARRAY(Types.INT)`.
Can you
Hi Marco,
sorry for the late reply. Have you looked into user-defined aggregate
functions for SQL? I think your requirements can be easily implemented
there. You can declare multiple aggregate functions per window. There is
also the built-in function LISTAGG that might help for your use case.
Hi,
first, we should clarify "continue to be put into the Flink table": A
Flink Table object does not physically store the data. It is basically a
view that contains a transformation pipeline.
When you are calling `collect()` the pipeline is executed and all
results from the cluster are stre
Hi Marco,
when you say "database" are you refering to the JDBC connector or would
you like to perform a JDBC query within some UDF? In the latter case, I
would recommend to use Flink's ProcessFunction because you can store the
cache hits in state (and thus keep them forever). SQL/Table API doe
Hi,
if you would like to dynamically adjust configuration of your streaming
job, it might be a good approach to consider the configuration as a
stream itself.
The connect() API can be used to connect a main stream with a control
stream. In any case the configuration should be persisted in st
Hi,
I gave some answers in the other mail thread. Some additional comment:
In general I think even configuration can be considered as state in this
case. If state is not set, the job can be considered as a fresh start.
Once the state is set, it would basically be just a configuration
update.
Hi,
Flink does not support time zones currently. However, all time
operations work on Java `long` values. It can be up to the user what
this long value represents. It must not be UTC but can also be adjusted
for another time zone. Since DataStream API supports arbirary Java
objects, users can
Hi Dan,
the exception that you get is a very frequent limitation in Flink SQL at
the moment.
I tried to summarize the issue recently here:
https://stackoverflow.com/questions/64445207/rowtime-attributes-must-not-be-in-the-input-rows-of-a-regular-join-despite-usi/64500296#64500296
The query i
Hi Dan,
are you intending to use interval joins, regular joins, or a mixture of
both?
For regular joins you must ensure to cast a rowtime attribute to
timestamp as early as possible. For interval joins, you need to make
sure that the rowtime attribute is unmodified.
Currently, I see
COALE
Hi Yuval,
the legacy type has no string representation that can be used in a SQL
DDL statement. The current string representation LEGACY(...) is only a
temporary work around to persist the old types in catalogs.
Until FLIP-136 is fully implemented, toAppendStream/toRetractStream
support only
id type of
the table.
Hope that clarifies a bit, since the pipeline is rather complex I
can't really share a MVCE of it.
On Mon, Dec 28, 2020 at 11:08 AM Timo Walther mailto:twal...@apache.org>> wrote:
Hi Yuval,
the legacy type has no string representa
P-136 to
resolve the issue around legacy types? Will it's implementation allow to
register LEGACY types? or a new variation of them?
On Mon, Dec 28, 2020 at 12:45 PM Timo Walther <mailto:twal...@apache.org>> wrote:
I would recommend to use the old UDF stack for now.
Hi Dan,
are you sure that your watermarks are still correct during reprocessing?
As far as I know, idle state retention is not used for temporal joins.
The watermark indicates when state can be removed in this case.
Maybe you can give us some more details about which kind of temporal
join yo
Hi Marco,
nesting aggregated functions is not allowed in SQL. The exception could
be improved though. I guess the planner searches for a scalar function
called `MyUDTAGG` in your example and cannot find one.
Maybe the built-in function `COLLECT` or `LISTAGG`is what you are
looking for?
htt
, ie, SELECT FROM (SELECT FROM)?
On Jan 5, 2021, at 6:10 AM, Timo Walther wrote:
Hi Marco,
nesting aggregated functions is not allowed in SQL. The exception could be
improved though. I guess the planner searches for a scalar function called
`MyUDTAGG` in your example and cannot find one
Hi Aeden,
we updated the connector property design in 1.11 [1]. The old
translation layer exists for backwards compatibility and is indicated by
`connector.type=kafka`.
However, `connector = kafka` indicates the new property design and
`key.fields` is only available there. Please check all p
tor-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java#L160
Does format.avro-schema need to be specified differently?
Thank you,
Aeden
On Thu, Jan 7, 2021 at 12:15 AM Timo Walther wrote:
Hi Aeden,
we updated the connector property design in 1.11 [1]
me in handy. I was looking through
docs hoping there was a way to still specify the schema with no luck.
Does such an option exist?
On Thu, Jan 7, 2021 at 2:33 AM Timo Walther <mailto:twal...@apache.org>> wrote:
Hi Aeden,
`format.avro-schema` is not required anymore in the n
Hi,
it seems this is a bug that is located in the Apache Calcite code.
I will open an issue for it.
Thanks for reporting this.
Regards,
Timo
On 12.01.21 11:08, jy l wrote:
Hi:
Flink SQL filter data throw an exception,
code:
def main(args: Array[String]): Unit = {
val env = StreamExecut
See here: https://issues.apache.org/jira/browse/FLINK-20942
On 12.01.21 16:04, Timo Walther wrote:
Hi,
it seems this is a bug that is located in the Apache Calcite code.
I will open an issue for it.
Thanks for reporting this.
Regards,
Timo
On 12.01.21 11:08, jy l wrote:
Hi:
Flink SQL
Hi Yuval,
could you share a reproducible example with us?
I see you are using SQL / Table API with a RAW type. I could imagine
that the KryoSerializer is configured differently when serializing and
when deserializing. This might be due to `ExecutionConfig` not shipped
(or copied) through the
Hi Robert,
could you send us the error/stacktrace that is printed?
An example how it should work is shown here:
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/UpdatingTopCityExample.java
https://github.c
ping
SlotPool. 2021-01-15 16:52:08,468 INFO
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] -
Disconnect job manager
0...@akka.tcp://flink@flink-jobmanager:6123/user/rpc/jobmanager_68
for job 84c9f12fe943bc7f32ee637666ed3bc1 from the resource m
maybe Godfrey in CC knows more?
On 15.01.21 18:10, Timo Walther wrote:
How are you running the Flink cluster? What is your deplyment?
The exception clearly indicates that you found a bug. Could you open an
ticket in Flink's JIRA? We need details how to reproduce it.
Thanks,
Timo
Hi Rex,
ListView and MapView have been part of Flink for years. However, they
were considered as an internal feature and therefore not well
documented. MapView is used internally to make distinct aggregates work.
Because we reworked the type inference of aggregate functions, we also
added ba
Hi Aeden,
computed columns on a DataStrem input are currently not supported. I am
currently working on making this possible. Have a look at FLIP-136 for
more information [1].
However, you can simply add a projection before you register a view:
tEnv.createTemporaryView("myTable", dataStream);
Hi Dan,
currently, we cannot provide any savepoint guarantees between releases.
Because of the nature of SQL that abstracts away runtime operators, it
might be that a future execution plan will look completely different and
thus we cannot map state anymore. This is not avoidable because the
o
Mode).
What's the link to Flink's JIRA?
On Fri, Jan 15, 2021 at 12:19 PM Timo Walther <mailto:twal...@apache.org>> wrote:
maybe Godfrey in CC knows more?
On 15.01.21 18:10, Timo Walther wrote:
> How are you running the Flink cluster? What is your deplyment?
Hi Rex,
feel free to open an issue for this. I could also imagine that
checkpoints and savepoints will further divert from each other and a
having different timeout might be reasonable.
Regards,
Timo
On 17.01.21 02:43, Rex Fenley wrote:
Thanks for the quick response.
Is this something tha
Hi Rex,
for questions like this, I would recommend to checkout the source code
as well.
Search for subclasses of `StreamPartitioner`. For example, for keyBy
Flink uses:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitio
exactly once?
I'm inferring what is meant by dry out. Are there any documented
patterns for it? E.g. sending data to new kafka topics between releases?
On Mon, Jan 18, 2021, 01:04 Timo Walther <mailto:twal...@apache.org>> wrote:
Hi Dan,
currently, we cannot provide
Hi Anton,
in many scenarios, it might be better just use a ProcessFunction because
you might reach the limit of the built-in window functions very quickly.
ProcessFunction gives you full flexibility and you can put into state
what you like and set/fire timers when you think the time is appropr
etween the two serializers Flink blows
up at runtime saying that the types don't match.
On Wed, Jan 13, 2021 at 1:19 PM Timo Walther <mailto:twal...@apache.org>> wrote:
Hi Yuval,
could you share a reproducible example with us?
I see you are using SQL / Table API
I ported the code to the Flink code base. Because I had issues with SBT
and Scala 2.12. Note it uses an older version of circe. I'm just pasting
it here in case it helps someone.
Regards,
Timo
On 18.01.21 13:51, Timo Walther wrote:
Hi Yuval,
thanks for sharing some code with us. I sc
Forgot to add the link:
https://github.com/twalthr/flink/tree/kryoBug_ser
Regards,
Timo
On 18.01.21 14:11, Timo Walther wrote:
I ported the code to the Flink code base. Because I had issues with SBT
and Scala 2.12. Note it uses an older version of circe. I'm just pasting
it here in ca
Hi,
in SQL event time is not part of the StreamRecord but a column in the
table. Thus, you need to extract it and specify the column name/location
when converting to Table API:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming/time_attributes.html#during-datastream-to
Forget what I said before, I just tested the behavior and it seems there
is a bug in the conversion logic.
I opened https://issues.apache.org/jira/browse/FLINK-21013
Thanks for reaching out to us.
Regards,
Timo
On 18.01.21 15:37, Timo Walther wrote:
Hi,
in SQL event time is not part of the
18.01.21 18:28, Rex Fenley wrote:
Fascinating, do you have an estimate of what qualifies as a lot of data
and therefore when this should be used?
Thanks
On Mon, Jan 18, 2021 at 12:35 AM Timo Walther <mailto:twal...@apache.org>> wrote:
Hi Rex,
ListView and MapView have been part
Hi Dylan,
I'm assuming your are using Flink 1.12 and the Blink planner?
Beginning from 1.12 you can use the "new" aggregate functions with a
better type inference. So TypeInformation will not be used in this stack.
I tried to come up with an example that should explain the rough design.
I wi
.outputTypeStrategy { callContext =>
val outputDataType =
callContext.getArgumentDataTypes().get(0);
Optional.of(outputDataType);
}
.build()
}
}
Regards,
> classOf[LatestNonNull[String]])
don't make a difference. The generics will be type erased in bytecode
and only the class name matters.
Thanks,
Timo
On 21.01.21 11:36, Timo Walther wrote:
Hi Dylan,
thanks for the investigation. I can now also reproduce it my code. Yes,
this is a bug.
://github.com/apache/flink/pull/13787
On 1/21/21, 8:50 AM, "Timo Walther" wrote:
I opened a PR. Feel free to try it out.
https://github.com/apache/flink/pull/14720
Btw:
>> env.createTemporarySystemFunction("LatestNonNullLong",
>
Hi,
I'm assuming you are using Flink 1.12?
The exception indicates that something is definitely going wrong with
the translation from Table API to optimizer nodes. We refactored a lot
of this code in this region. I investogate the issue and come back to
you once I opended a ticket.
Thanks f
Hi Flavio,
FLIP-129 will update the connect() API with a programmatic way of
defining tables. In the API we currently only support the DDL via
executeSql.
I would recommend to implement the Catalog interface. This interface has
a lot of methods, but you only need to implement a couple of met
answer TImo! I think I'll wait for the
migration to finish before updating my code.
However, does the usage of a catalog solve the problem of CSV override
as well? I can't find a way to use INSERT OVERRIDE with a CSV sink using
the executeSql.
Best,
Flavio
On Mon, Jan 25, 2021 at 10:
Hi Gyula,
the TableEnvironment.getConfig offers a setPlannerConfig. And
org.apache.flink.table.planner.calcite.CalciteConfigBuilder helps in
creating an object that implements this interface. You should be able to
influence the Calcite parser config with this.
However, I'm not sure how well
Hi Yuval,
we should definitely find the root cause of this issue. It helps if the
exception happens frequently to nail down the problem.
Have you tried to replace the JSON object with a regular String? If the
exception is gone after this change. I believe it must be the
serialization and not
t directly related to a Kryo serialization of the specific
underlying type (io.circe.Json), but something in the way it interacts
with BinaryRawValueData and writing out to the network buffer behind the
scenes.
On Thu, Jan 28, 2021 at 5:26 PM Timo Walther <mailto:twal...@apache.org>&
I don't see any type equality issues, and
I see the same serializer being invoked for both serialization and
deserialization.
On Thu, Jan 28, 2021 at 5:51 PM Timo Walther <mailto:twal...@apache.org>> wrote:
This is helpful information. So I guess the problem must be in th
Hi,
it is true that there is no dedicated machine learning library for
Flink. Flink is a general data processing framework. It allows to
embedded any available algorithm library within user-defined functions.
Flink's focus is on stream processing. There are not many dedicated
stream processi
milar.
On Thu, Jan 28, 2021 at 11:45 PM Dan Hill <mailto:quietgol...@gmail.com>> wrote:
Is this savepoint recovery issue also true with the Flink Table
API? I'd assume so. Just doublechecking.
On Mon, Jan 18, 2021 at 1:58 AM Timo Walther mailto:twal...@apache.org&g
Hi,
sorry I forgot to further investigate this issue. It seems the last
refactoring of the code base caused this documented feature to break. I
opened an issue for it:
https://issues.apache.org/jira/browse/FLINK-21225
For now, I would suggest to use SQL for the same behavior. I hope
someone
Hi Patrick,
I could imagine that LEAD/LAG are translated into RANK/ROW_NUMBER
operations that are not supported in this context.
But I will loop in @Jark who might know more about the limitaitons here.
Regards,
Timo
On 29.01.21 17:37, Patrick Angeles wrote:
Another (hopefully newbie) questi
Hi Devin,
Flink supports arbitrary data types. You can simply read the JSON object
as a big string first and process the individual event types in a UDF
using e.g. the Jackson library.
Are you using SQL or DataStream API?
An alternative is to set the "fail-on-missing-field" flag to false. Th
Hi Shridhar,
the exception indicates that something is wrong with the object
serialization. Kryo is unable to serialize the given object.
It might help to
1) register a custom Kryo serializer in the ExecutionConfig or
2 ) pass dedicated type information using the types from
org.apache.flink
Hi Omkar,
sorry for the late reply. This sounds like a serious issue. It looks
like some of the RocksDB data is corrupt. Are you sure this is not a
problem of you storage layer?
Otherwise I would investigate whether the serializers work correctly.
Maybe Beam did put a corrupt data into Flink
Hi Jiazhi,
I think an OVER window might solve your use case. It gives you a rolling
aggregation over period of time. Maybe you need to define a custom
aggregate function to emit the final record as you need it.
Let me know if you have further questions.
Regards,
Timo
On 27.01.21 15:02, ?g?
Hi Rex,
processing-time gives you no alignment of operators across nodes. Each
operation works with its local machine clock that might be interrupted
by the OS, Java garbage collector, etc. It is always a best effort timing.
Regards,
Timo
On 27.01.21 18:16, Rex Fenley wrote:
Hello,
I'm lo
Hi Sebastian,
sorry for the late reply. Could you solve the problem in the meantime?
It definitely looks like a dependency conflict.
Regards,
Timo
On 22.01.21 18:18, Sebastián Magrí wrote:
Thanks a lot Matthias!
In the meantime I'm trying out something with the scala quickstart.
On Fri,
pointed with its previous proctime, then
it may be misaligned in the next run and drop rows that were in that window.
On Mon, Feb 1, 2021 at 6:37 AM Timo Walther <mailto:twal...@apache.org>> wrote:
Hi Rex,
processing-time gives you no alignment of operators across nodes. Each
Hi Yuval,
yes this is rather a bug. If we support VARCHAR here we should also
support CHAR. Feel free to open an issue.
Regards,
Timo
On 03.02.21 11:46, Yuval Itzchakov wrote:
I can understand that in some sense it's nonsensical to MAX on a CHAR,
since Blink will only determine a CHAR when t
Hi Yuval,
we changed this behavior a bit to be more SQL compliant. Currently,
sinks must be explicitly defined with a PRIMARY KEY constraint. We had
discussions about implicit sinks, but nothing on the roadmap yet. The
`CREATE TEMPORARY TABLE LIKE` clause should make it easy to extend the
ori
,
Timo
On 03.02.21 16:24, Yuval Itzchakov wrote:
Hi Timo,
The problem with this is I would still have to determine the keys
manually, which is not really feasible in my case. Is there any internal
API that might be of use to extract this information?
On Wed, Feb 3, 2021 at 5:19 PM Timo Walther
Hi Xavier,
the Scala API has special implicits in method such as `DataStream.map()`
or `DataStream.keyBy()` to support Scala specifics like case classe. For
Scala one needs to use the macro `createTypeInformation[CaseClass]` for
Java we use reflection via `TypeInformation.of()`. But Scala and
,
Thank you for ur clarification, it is very useful to me, I am also
combining the realization of map function, trying to do implicit
conversion of case class, so that I can restore state from FS.
On Fri, Feb 5, 2021 at 10:38 PM Timo Walther <mailto:twal...@apache.org>> wrote:
Hi Yongsong,
in newer Flink versions we introduced the concept of statament sets,
which are available via `TableEnvironment.createStatementSet()`. They
allow you to opimized a branching pipeline as a whole with reusing subplans.
In older Flink versions, you can convert the Table to a DataStre
Hi,
could the problem be that you are mixing OVER and TUMBLE window with
each other? The TUMBLE is correctly defined over time attribute `row_ts`
but the OVER window is defined using a regular column `upd_ts`. This
might be the case why the query is not append-only but updating.
Maybe you ca
Hi Abdelilah,
at a first glance your logic seems to be correct. But Arvid is right
that your pipeline might not have the optimal performance that Flink can
offer due to the 3 groupBy operations. I'm wondering what the optimizer
produces out of this plan. Maybe you can share it with us using
`
that will matter for our case, we just want to make sure we don't lose
any data or have any gaps between windows.
Please confirm if I got this right, and thank you much for your reply!
On Tue, Feb 2, 2021 at 3:17 AM Timo Walther <mailto:twal...@apache.org>> wrote:
As far as I
Hi Dan,
the order of all joins depends on the order in the SQL query by default.
You can also check the following example (not interval joins though) and
swap e.g. b and c:
env.createTemporaryView("a", env.fromValues(1, 2, 3));
env.createTemporaryView("b", env.fromValues(4, 5, 6));
env.create
After thinking about this topic again, I think UNION ALL will not solve
the problem because you would need to group by brandId and perform the
joining within the aggregate function which could also be quite expensive.
Regards,
Timo
On 11.02.21 17:16, Timo Walther wrote:
Hi Abdelilah,
at a
QL to DataStream.
On Thu, Feb 11, 2021 at 9:11 AM Timo Walther <mailto:twal...@apache.org>> wrote:
Hi Dan,
the order of all joins depends on the order in the SQL query by default.
You can also check the following example (not interval joins though)
and
swa
1 - 100 of 683 matches
Mail list logo