Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
the mysql connector is put in the client classpath and in the Flink lib
dir. When i debugged remotely the AbandonedConnectionCleanupThread was
initialized at the first run of the job by the taskmamager. Today I'll try
to run the mysql connector in a standalone java app to see if the property
is read correctly or not.

Il ven 20 nov 2020, 07:52 Arvid Heise  ha scritto:

> Hi Flavio,
>
> if it arrives in the java process then you are doing everything right
> already (or almost).
>
> Are you shading the mysql connector? I'm suspecting that the property also
> get shaded then. You could decompile your jar to be sure. Have you verified
> that this is working as intended without Flink?
>
> On Thu, Nov 19, 2020 at 9:19 PM Flavio Pompermaier 
> wrote:
>
>> the properties arrives to the task manager because I can see them in the
>> java process (using ps aux)..or donyoubmean some special line of code?
>>
>> Il gio 19 nov 2020, 20:53 Arvid Heise  ha scritto:
>>
>>> Hi Flavio,
>>>
>>> you are right, all looks good.
>>>
>>> Can you please verify if the properties arrived at the task manager in
>>> the remote debugger session? For example, you could check the JVisualVM
>>> Overview tab.
>>>
>>> On Thu, Nov 19, 2020 at 8:38 PM Flavio Pompermaier 
>>> wrote:
>>>
 At the moment I use a standalone cluster, isn't using env.java.opts the
 right way to do it?

 Il gio 19 nov 2020, 20:11 Arvid Heise  ha scritto:

> Hi Flavio,
>
> -D afaik passes only the system property to the entry point (client or
> jobmanager depending on setup), while you probably want to have it on the
> task managers.
>
> The specific options to pass it to the task managers depend on the way
> you deploy. -yD for yarn for example. For docker or k8s, you would use 
> env.
>
> On Wed, Nov 18, 2020 at 10:20 PM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> Hi to all,
>> while trying to solve a leak with dynamic class loading I found out
>> that mysql connector creates an AbandonedConnectionCleanupThread that
>> is retained in the ChildFirstClassLoader..from version 8.0.22 there's
>> the possibility to inhibit this thread passing the system property
>> com.mysql.disableAbandonedConnectionCleanup=true [1] (I put the mysql
>> jar in the lib folder).
>>
>> I tried to set in the flink-conf.yml
>> env.java.opts: "-Dcom.mysql.disableAbandonedConnectionCleanup=true"
>>
>> but the property does not produce the desired effect in the static
>> section of such a thread [2] (I verified that attaching the remote
>> debugger to the task manager).
>>
>> How can I fix this problem?
>>
>> [1]
>> https://dev.mysql.com/doc/relnotes/connector-j/8.0/en/news-8-0-22.html
>> [2]
>> public class AbandonedConnectionCleanupThread implements Runnable {8898
>> 7324 0768
>> private static boolean abandonedConnectionCleanupDisabled =
>>
>> Boolean.getBoolean(PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup);
>>
>> static {
>> if (abandonedConnectionCleanupDisabled) {
>> cleanupThreadExecutorService = null;
>> } else {
>> cleanupThreadExecutorService =
>>Executors.newSingleThreadExecutor(r -> {}
>>}
>>   }
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
> Ji (Toni) Cheng
>

>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Y

Re: Dynamic ad hoc query deployment strategy

2020-11-20 Thread Kostas Kloudas
I am also cc'ing Timo to see if he has anything more to add on this.

Cheers,
Kostas

On Thu, Nov 19, 2020 at 9:41 PM Kostas Kloudas  wrote:
>
> Hi,
>
> Thanks for reaching out!
>
> First of all, I would like to point out that an interesting
> alternative to the per-job cluster could be running your jobs in
> application mode [1].
>
> Given that you want to run arbitrary SQL queries, I do not think you
> can "share" across queries the part of the job graph that reads a
> topic. In general, Flink (not only in SQL) creates the graph of a job
> before the job is executed. And especially in SQL you do not even have
> control over the graph, as the translation logic from query to
> physical operators is opaque and not exposed to the user.
>
> That said, you may want to have a look at [2]. It is pretty old but it
> describes a potentially similar usecase. Unfortunately, it does not
> support SQL.
>
> Cheers,
> Kostas
>
> [1] https://flink.apache.org/news/2020/07/14/application-mode.html
> [2] https://www.ververica.com/blog/rbea-scalable-real-time-analytics-at-king
>
> On Sun, Nov 15, 2020 at 10:11 AM lalala  wrote:
> >
> > Hi all,
> >
> > I would like to consult with you regarding deployment strategies.
> >
> > We have +250 Kafka topics that we want users of the platform to submit SQL
> > queries that will run indefinitely. We have a query parsers to extract topic
> > names from user queries, and the application locally creates Kafka tables
> > and execute the query. The result can be collected to multiple sinks such as
> > databases, files, cloud services.
> >
> > We want to have the best isolation between queries, so in case of failures,
> > the other jobs will not get affected. We have a huge YARN cluster to handle
> > 1PB a day scale from Kafka. I believe cluster per job type deployment makes
> > sense for the sake of isolation. However, that creates some scalability
> > problems. There might be SQL queries running on the same Kafka topic that we
> > do not want to read them again for each query in different sessions. The
> > ideal case is that we read the topic once and executes multiple queries on
> > this data to avoid rereading the same topic. That breaks the desire of a
> > fully isolated system, but it improves network and Kafka performance and
> > still provides isolation on the topic level as we just read the topic once
> > and execute multiple SQL queries on it.
> >
> > We are quite new to Flink, but we have experience with Spark. In Spark, we
> > can submit an application, and in master, that can listen a query queue and
> > submit jobs to the cluster dynamically from different threads. However, In
> > Flink, it looks like the main() has to produce the job the graph in advance.
> >
> > We do use an EMR cluster; what would you recommend for my use case?
> >
> > Thank you.
> >
> >
> >
> > --
> > Sent from: 
> > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-11-20 Thread Kostas Kloudas
Hi Hector,

The main reasons for deprecating the readFileStream() was that:
1) it was only capable of parsing Strings and in a rather limited way
as one could not even specify the encoding
2) it was not fault-tolerant, so your concerns about exactly-once were
not covered

One concern that I can find about keeping the last read index for
every file that we have seen so far,
is that this would simply blow up the memory.

Two things I would like to also mention are that:
1) the method has been deprecated a long time ago.
2) there is a new FileSource coming with 1.12 that may be interesting
for you [1].

Cheers,
Kostas

 [1] 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/FileSource.java

On Tue, Nov 17, 2020 at 4:30 AM Hector He  wrote:
>
> May I have a ask about deprecating readFileStream(...), is there a
> alternative to this method? Source code lead me to use readFile instead, but
> it does not perform as readFileStream, readFileStream can reads file content
> incrementally, but readFile with FileProcessingMode.PROCESS_CONTINUOUSLY
> argument reads all file conent every time when the content changes. So why
> will Flink make readFileStream to be deprecated but without a better
> alternative?
>
> From the description of official document below link,
> FileProcessingMode.PROCESS_CONTINUOUSLY will break the “exactly-once”
> semantics.
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/datastream_api.html#data-sources
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Jdbc input format and system properties

2020-11-20 Thread Arvid Heise
I was particularly asking if you relocate classes. Since the property name
looks like a class name, it could have been changed as well. Could you
check the value of
PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup in your final
jar?

On Fri, Nov 20, 2020 at 9:35 AM Flavio Pompermaier 
wrote:

> the mysql connector is put in the client classpath and in the Flink lib
> dir. When i debugged remotely the AbandonedConnectionCleanupThread was
> initialized at the first run of the job by the taskmamager. Today I'll try
> to run the mysql connector in a standalone java app to see if the property
> is read correctly or not.
>
> Il ven 20 nov 2020, 07:52 Arvid Heise  ha scritto:
>
>> Hi Flavio,
>>
>> if it arrives in the java process then you are doing everything right
>> already (or almost).
>>
>> Are you shading the mysql connector? I'm suspecting that the property
>> also get shaded then. You could decompile your jar to be sure. Have you
>> verified that this is working as intended without Flink?
>>
>> On Thu, Nov 19, 2020 at 9:19 PM Flavio Pompermaier 
>> wrote:
>>
>>> the properties arrives to the task manager because I can see them in the
>>> java process (using ps aux)..or donyoubmean some special line of code?
>>>
>>> Il gio 19 nov 2020, 20:53 Arvid Heise  ha scritto:
>>>
 Hi Flavio,

 you are right, all looks good.

 Can you please verify if the properties arrived at the task manager in
 the remote debugger session? For example, you could check the JVisualVM
 Overview tab.

 On Thu, Nov 19, 2020 at 8:38 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> At the moment I use a standalone cluster, isn't using env.java.opts
> the right way to do it?
>
> Il gio 19 nov 2020, 20:11 Arvid Heise  ha
> scritto:
>
>> Hi Flavio,
>>
>> -D afaik passes only the system property to the entry point (client
>> or jobmanager depending on setup), while you probably want to have it on
>> the task managers.
>>
>> The specific options to pass it to the task managers depend on the
>> way you deploy. -yD for yarn for example. For docker or k8s, you would 
>> use
>> env.
>>
>> On Wed, Nov 18, 2020 at 10:20 PM Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> Hi to all,
>>> while trying to solve a leak with dynamic class loading I found out
>>> that mysql connector creates an AbandonedConnectionCleanupThread that
>>> is retained in the ChildFirstClassLoader..from version 8.0.22 there's
>>> the possibility to inhibit this thread passing the system property
>>> com.mysql.disableAbandonedConnectionCleanup=true [1] (I put the mysql
>>> jar in the lib folder).
>>>
>>> I tried to set in the flink-conf.yml
>>> env.java.opts: "-Dcom.mysql.disableAbandonedConnectionCleanup=true"
>>>
>>> but the property does not produce the desired effect in the static
>>> section of such a thread [2] (I verified that attaching the remote
>>> debugger to the task manager).
>>>
>>> How can I fix this problem?
>>>
>>> [1]
>>> https://dev.mysql.com/doc/relnotes/connector-j/8.0/en/news-8-0-22.html
>>> [2]
>>> public class AbandonedConnectionCleanupThread implements Runnable {8898
>>> 7324 0768
>>> private static boolean abandonedConnectionCleanupDisabled =
>>>
>>> Boolean.getBoolean(PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup);
>>>
>>> static {
>>> if (abandonedConnectionCleanupDisabled) {
>>> cleanupThreadExecutorService = null;
>>> } else {
>>> cleanupThreadExecutorService =
>>>Executors.newSingleThreadExecutor(r -> {}
>>>}
>>>   }
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>> Ji (Toni) Cheng
>>
>

 --

 Arvid Heise | Senior Java Developer

 

 Follow us @VervericaData

 --

 Join Flink Forward  - The Apache Flink
 Conference

 Stream Processing | Event Driven | Real Time

 --

 Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

 --
 Ververica GmbH
 Registered at Amtsgericht Charlottenburg: HRB 158244 B
 Managing Directors: Timothy Alexander Steinert, Yip Park Tung 

Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
no no I didn't relocate any class related to jdbc

Il ven 20 nov 2020, 10:02 Arvid Heise  ha scritto:

> I was particularly asking if you relocate classes. Since the property name
> looks like a class name, it could have been changed as well. Could you
> check the value of
> PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup in your final
> jar?
>
> On Fri, Nov 20, 2020 at 9:35 AM Flavio Pompermaier 
> wrote:
>
>> the mysql connector is put in the client classpath and in the Flink lib
>> dir. When i debugged remotely the AbandonedConnectionCleanupThread was
>> initialized at the first run of the job by the taskmamager. Today I'll try
>> to run the mysql connector in a standalone java app to see if the property
>> is read correctly or not.
>>
>> Il ven 20 nov 2020, 07:52 Arvid Heise  ha scritto:
>>
>>> Hi Flavio,
>>>
>>> if it arrives in the java process then you are doing everything right
>>> already (or almost).
>>>
>>> Are you shading the mysql connector? I'm suspecting that the property
>>> also get shaded then. You could decompile your jar to be sure. Have you
>>> verified that this is working as intended without Flink?
>>>
>>> On Thu, Nov 19, 2020 at 9:19 PM Flavio Pompermaier 
>>> wrote:
>>>
 the properties arrives to the task manager because I can see them in
 the java process (using ps aux)..or donyoubmean some special line of code?

 Il gio 19 nov 2020, 20:53 Arvid Heise  ha scritto:

> Hi Flavio,
>
> you are right, all looks good.
>
> Can you please verify if the properties arrived at the task manager in
> the remote debugger session? For example, you could check the JVisualVM
> Overview tab.
>
> On Thu, Nov 19, 2020 at 8:38 PM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> At the moment I use a standalone cluster, isn't using env.java.opts
>> the right way to do it?
>>
>> Il gio 19 nov 2020, 20:11 Arvid Heise  ha
>> scritto:
>>
>>> Hi Flavio,
>>>
>>> -D afaik passes only the system property to the entry point (client
>>> or jobmanager depending on setup), while you probably want to have it on
>>> the task managers.
>>>
>>> The specific options to pass it to the task managers depend on the
>>> way you deploy. -yD for yarn for example. For docker or k8s, you would 
>>> use
>>> env.
>>>
>>> On Wed, Nov 18, 2020 at 10:20 PM Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 Hi to all,
 while trying to solve a leak with dynamic class loading I found out
 that mysql connector creates an AbandonedConnectionCleanupThread
 that
 is retained in the ChildFirstClassLoader..from version 8.0.22
 there's
 the possibility to inhibit this thread passing the system property
 com.mysql.disableAbandonedConnectionCleanup=true [1] (I put the
 mysql
 jar in the lib folder).

 I tried to set in the flink-conf.yml
 env.java.opts: "-Dcom.mysql.disableAbandonedConnectionCleanup=true"

 but the property does not produce the desired effect in the static
 section of such a thread [2] (I verified that attaching the remote
 debugger to the task manager).

 How can I fix this problem?

 [1]
 https://dev.mysql.com/doc/relnotes/connector-j/8.0/en/news-8-0-22.html
 [2]
 public class AbandonedConnectionCleanupThread implements Runnable {8898
 7324 0768
 private static boolean abandonedConnectionCleanupDisabled =

 Boolean.getBoolean(PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup);

 static {
 if (abandonedConnectionCleanupDisabled) {
 cleanupThreadExecutorService = null;
 } else {
 cleanupThreadExecutorService =
Executors.newSingleThreadExecutor(r -> {}
}
   }

>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason,
>>> Ji (Toni) Cheng
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Pr

Re: PyFlink - Scala UDF - How to convert Scala Map in Table API?

2020-11-20 Thread Pierre Oberholzer
Hi Wei,

Thanks for the hint. May I please follow up by adding more context and ask
for your guidance.

In case the bespoken Map[String,Any] object returned by Scala:

- Has a defined schema (incl. nested) with up to 100k (!) different
possible keys
- Has only some portion of the keys populated for each record
- Is convertible to JSON
- Has to undergo downstream processing in Flink and/or Python UDF with key
value access
- Has to be ultimately stored in a Kafka/AVRO sink

How would you declare the types explicitly in such a case ?

Thanks for your support !

Pierre

Le jeu. 19 nov. 2020 à 03:54, Wei Zhong  a écrit :

> Hi Pierre,
>
> Currently there is no type hint like ‘Map[String, Any]’. The recommended
> way is declaring your type more explicitly.
>
> If you insist on doing this, you can try to declaring a RAW data type for
> java.util.HashMap [1], but you may encounter some troubles [2] related to
> the kryo serializers.
>
> Best,
> Wei
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
> [2]
> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>
>
> 在 2020年11月19日,04:31,Pierre Oberholzer  写道:
>
> Hi Wei,
>
> It works ! Thanks a lot for your support.
> I hadn't tried this last combination for option 1, and I had wrong
> syntax for option 2.
>
> So to summarize..
>
> Methods working:
> - Current: DataTypeHint in UDF definition + SQL for UDF registering
> - Outdated: override getResultType in UDF definition
> + t_env.register_java_function for UDF registering
>
> Type conversions working:
> - scala.collection.immutable.Map[String,String] =>
> org.apache.flink.types.Row => ROW
> - scala.collection.immutable.Map[String,String] =>
> java.util.Map[String,String] => MAP
>
> Any hint for Map[String,Any] ?
>
> Best regards,
>
> Le mer. 18 nov. 2020 à 03:26, Wei Zhong  a écrit :
>
>> Hi Pierre,
>>
>> Those 2 approaches all work in my local machine, this is my code:
>>
>> Scala UDF:
>>
>> package com.dummy
>>
>> import org.apache.flink.api.common.typeinfo.TypeInformation
>> import org.apache.flink.table.annotation.DataTypeHint
>> import org.apache.flink.table.api.Types
>> import org.apache.flink.table.functions.ScalarFunction
>> import org.apache.flink.types.Row
>>
>> /**
>>   * The scala UDF.
>>   */
>> class dummyMap extends ScalarFunction {
>>
>>   // If the udf would be registered by the SQL statement, you need add this 
>> typehint
>>   @DataTypeHint("ROW")
>>   def eval(): Row = {
>>
>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>>
>>   }
>>
>>   // If the udf would be registered by the method 'register_java_function', 
>> you need override this
>>   // method.
>>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
>> = {
>> // The type of the return values should be TypeInformation
>> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
>> Types.STRING()))
>>   }
>> }
>>
>> Python code:
>>
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment
>>
>> s_env = StreamExecutionEnvironment.get_execution_environment()
>> st_env = StreamTableEnvironment.create(s_env)
>>
>> # load the scala udf jar file, the path should be modified to yours
>> # or your can also load the jar file via other approaches
>> st_env.get_config().get_configuration().set_string("pipeline.jars", "
>> file:///Users/zhongwei/the-dummy-udf.jar")
>>
>> # register the udf via
>> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap'
>> LANGUAGE SCALA")
>> # or register via the method
>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>>
>> # prepare source and sink
>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a',
>> 'b', 'c'])
>> st_env.execute_sql("""create table mySink (
>> output_of_my_scala_udf ROW
>> ) with (
>> 'connector' = 'print'
>> )""")
>>
>> # execute query
>>
>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>>
>> Best,
>> Wei
>>
>> 在 2020年11月18日,03:28,Pierre Oberholzer  写道:
>>
>> Hi Wei,
>>
>> True, I'm using the method you mention, but glad to change.
>> I tried your suggestion instead, but got a similar error.
>>
>> Thanks for your support. That is much more tedious than I thought.
>>
>> *Option 1 - SQL UDF*
>>
>> *SQL UDF*
>> create_func_ddl = """
>> CREATE FUNCTION dummyMap
>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>> """
>>
>> t_env.execute_sql(create_func_ddl)
>>
>> *Error*
>> Py4JJavaError: An error occurred while calling o672.execute.
>> : org.apache.flink.table.api.TableException: Result field does not match
>> requested type. Requested: Row(s: String, t: String); Actual:
>> GenericType
>>
>> *Option 2 *- *Overriding getResultType*
>>
>> Back to the old registering method, but overriding getResultType:
>>
>> t_env.register_java_function("dummyMap","com.

Re: How to use EventTimeSessionWindows.withDynamicGap()

2020-11-20 Thread Aljoscha Krettek

Sure, my pleasure!

Aljoscha

On 19.11.20 16:12, Simone Cavallarin wrote:

Many thanks for the Help!!

Simone


From: Aljoscha Krettek 
Sent: 19 November 2020 11:46
To: user@flink.apache.org 
Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()

On 17.11.20 17:37, Simone Cavallarin wrote:

Hi,

I have been working on the suggestion that you gave me, thanks! The first part is to add to the 
message the gap. 1)I receive the event, 2)I take that event and I map it using  
StatefulsessionCalculator, that is where I put together "The message", and 
"long" that is my gap in millis.

DataStream source = 

Operation in front of the window that keeps track of session gaps

DataStream> enriched = source
 .keyBy()
 .map(new StatefulSessionCalculator()); // or process()

This is my StatefulSessionCalculator():

Tuple2 map(MyMessageType input) {
 ValueState valueState = getState(myModelStateDescriptor);
MyState state = valueState.value()
 state.update(input);
 long suggestedGap = state.getSuggestedGap();
 valueState.update(state);
 return Tuple2.of(input, suggestedGap);
}

If the "gap" calculated is "1234".
The result would be: [Tom, 1.70, 50, 1605612588995], [1234]>?


That looks correct, yes.


The second step is to use the gap calculated through  
DynamicWindowGapExtractor().

DataStream<...> result = enriched
 .keyBy(new MyKeySelector())
 .window(EventTimeSessionWindows.withDynamicGap(new 
DynamicWindowGapExtractor()))


The DynamicWindowGapExtractor() extract the gap from the message and feed it 
back to Flink.
Could you please give me an example also for this one?


This would just be class that extends
SessionWindowTimeGapExtractor> and returns the gap
from the extract() method.


One thing that I don't understand is that after enriching the message my event 
that contain a POJO is nested inside tuple. How can I access it?


You would just read the first field of the tuple, i.e. tuple.f0.



The last point when you said: "I think, however, that it might be easier at this 
point to just use a stateful ProcessFunction", you meant a completely different 
approach, would be better?


That's what I meant yes. Because it seems to complicated to split the
logic into the part that determines the dynamic gap and then another
part that does the computation per session. It seems easier to just roll
that into one operator that does everything. And with state and timers
you should have enough flexibility.

Best,
Aljoscha






Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
I've just tested the following code in a java class and the property
(-Dcom.mysql.cj.disableAbandonedConnectionCleanup=true)  is read correctly
and the abandonedConnectionCleanupDisabled does not initialize
the cleanupThreadExecutorService (that in my other test was causing a
dynamic classloading memory leak):

 try {
  Class.forName(drivername);
  Connection dbConn = DriverManager.getConnection(dbURL, username,
password);
  System.out.println("OK");
} catch (SQLException se) {
  throw new IllegalArgumentException("open() failed." +
se.getMessage(), se);
} catch (ClassNotFoundException cnfe) {
  throw new IllegalArgumentException("JDBC-Class not found. - " +
cnfe.getMessage(), cnfe);
}

This is the output of ps aux command (I can also notice that the
taskmanager parameters are created using a space after the -D option that I
don't know if it intended or not):

flink23904 59.5  1.1 5488264 380396 pts/5  Sl   10:41   0:08 java
-Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458
-XX:MaxMetaspaceSize=268435456
-Dcom.mysql.disableAbandonedConnectionCleanup=true
-Dlog.file=/opt/flink/flink-1.11.0/log/flink-flink-taskexecutor-0-flavio-ThinkPad-P50.log
-Dlog4j.configuration=file:/opt/flink/flink-1.11.0/conf/log4j.properties
-Dlog4j.configurationFile=file:/opt/flink/flink-1.11.0/conf/log4j.properties
-Dlogback.configurationFile=file:/opt/flink/flink-1.11.0/conf/logback.xml
-classpath
/opt/flink/flink-1.11.0/lib/flink-csv-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-json-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/flink-1.11.0/lib/flink-table_2.12-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-table-blink_2.12-1.11.0.jar:/opt/flink/flink-1.11.0/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-api-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-core-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/flink-1.11.0/lib/mariadb-java-client-2.6.0.jar:/opt/flink/flink-1.11.0/lib/mssql-jdbc-7.4.1.jre11.jar:/opt/flink/flink-1.11.0/lib/mysql-connector-java-8.0.22.jar:/opt/flink/flink-1.11.0/lib/flink-dist_2.12-1.11.0.jar:::
org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir
/opt/flink/flink-1.11.0/conf -D
taskmanager.memory.framework.off-heap.size=134217728b -D
taskmanager.memory.network.max=134217730b -D
taskmanager.memory.network.min=134217730b -D
taskmanager.memory.framework.heap.size=134217728b -D
taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D
taskmanager.memory.task.heap.size=402653174b -D
taskmanager.memory.task.off-heap.size=0b

The mysql dependency in the maven project is:


   mysql
   mysql-connector-java
   8.0.22
   provided


On Fri, Nov 20, 2020 at 10:07 AM Flavio Pompermaier 
wrote:

> no no I didn't relocate any class related to jdbc
>
> Il ven 20 nov 2020, 10:02 Arvid Heise  ha scritto:
>
>> I was particularly asking if you relocate classes. Since the property
>> name looks like a class name, it could have been changed as well. Could you
>> check the value of
>> PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup in your final
>> jar?
>>
>> On Fri, Nov 20, 2020 at 9:35 AM Flavio Pompermaier 
>> wrote:
>>
>>> the mysql connector is put in the client classpath and in the Flink lib
>>> dir. When i debugged remotely the AbandonedConnectionCleanupThread was
>>> initialized at the first run of the job by the taskmamager. Today I'll try
>>> to run the mysql connector in a standalone java app to see if the property
>>> is read correctly or not.
>>>
>>> Il ven 20 nov 2020, 07:52 Arvid Heise  ha scritto:
>>>
 Hi Flavio,

 if it arrives in the java process then you are doing everything right
 already (or almost).

 Are you shading the mysql connector? I'm suspecting that the property
 also get shaded then. You could decompile your jar to be sure. Have you
 verified that this is working as intended without Flink?

 On Thu, Nov 19, 2020 at 9:19 PM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> the properties arrives to the task manager because I can see them in
> the java process (using ps aux)..or donyoubmean some special line of code?
>
> Il gio 19 nov 2020, 20:53 Arvid Heise  ha
> scritto:
>
>> Hi Flavio,
>>
>> you are right, all looks good.
>>
>> Can you please verify if the properties arrived at the task manager
>> in the remote debugger session? For example, you could check the 
>> JVisualVM
>> Overview tab.
>>
>> On Thu, Nov 19, 2020 at 8:38 PM Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:
>>
>>> At the moment I use a standalone cluster, isn't using env.java.opts
>>> the right way to do it?
>>>
>>> Il gio 19 nov 2020, 20:11 Arvid Heise  ha
>>> scritto:
>>>
 Hi Flavio,

 -D afaik passes only the system property to the entry point (client
 or jobmanager depend

Re: State of Machine Learning with Flink and especially FLIP-39

2020-11-20 Thread Niklas Wilcke
Hi Arvid and Jiangjie,

thanks to both of you for the quick and valuable response.
I will take a look at the linked projects.

Kind Regards,
Niklas


--
niklas.wil...@uniberg.com
Mobile: +49 160 9793 2593
Office: +49 40 2380 6523

Simon-von-Utrecht-Straße 85a
20359 Hamburg

UNIBERG GmbH
Registergericht: Amtsgericht Kiel HRB SE-1507
Geschäftsführer: Andreas Möller, Martin Ulbricht

Information Art. 13 DSGVO B2B:
Für die Kommunikation mit Ihnen verarbeiten wir ggf. Ihre personenbezogenen 
Daten.
Alle Informationen zum Umgang mit Ihren Daten finden Sie unter 
https://www.uniberg.com/impressum.html. 

> On 20. Nov 2020, at 03:07, Becket Qin  wrote:
> 
> Hi Niklas,
> 
> We dropped the Flink ML lib in 1.9 and plan to replace it with a new machine 
> learning library for traditional machine learning algorithms. And that 
> library will be based on FLIP-39. The plan was pushed back a little bit 
> because we plan to deprecate DataSet API and but haven't got the batch 
> iteration support in DataStream API yet. So at this point we don't have an ML 
> lib implementation in Flink.
> 
> That being said, we are working with the community to add some ML related 
> features to Flink. At this point, we have the following two projects 
> available from Alibaba that will likely be contributed to Flink. You may also 
> take a look at them.
> 
> Alink -  A machine learning library.
> https://github.com/alibaba/alink 
> 
> Flink-AI-Extended - A project helps running TF / PyTorch on top of Flink.
> https://github.com/alibaba/flink-ai-extended 
> 
> 
> Thanks,
> 
> Jiangjie (Becket) Qin
> 
> On Fri, Nov 20, 2020 at 3:43 AM Arvid Heise  > wrote:
> Hi Niklas,
> 
> indeed some efforts on the machine learning libraries are pushed back in 
> favor of getting proper PyTorch and Tensorflow support through PyFlink. 
> 
> Native implementations in Flink have been done so far in the DataSet API, 
> which is going to deprecated in the next few releases in favor of the unified 
> DataStream API with bounded streams. I expect efforts for native 
> implementations to be picked up once DataSet is fully replaced to avoid 
> doubling the work. One of the most important features that is lacking is 
> proper iteration support in DataStream.
> 
> On Thu, Nov 19, 2020 at 1:34 PM Niklas Wilcke  > wrote:
> Hi Flink-Community,
> 
> I'm digging through the history of FlinkML and FLIP-39 [0]. What I understood 
> so far is that FlinkML has been removed in 1.9, because it got unmaintained.
> I'm not really able to find out whether FLIP-39 and providing a replacement 
> for FlinkML is currently worked on. The Umbrella Jira Ticket FLINK-12470 [1] 
> looks stale to me.
> Was there maybe a change of strategy in the meantime? Is the focus currently 
> on PyFlink to provide ML-Solutions (FLIP-96 [2])?
> It would be really interesting to get some insights about the future and 
> roadmap of ML in the Flink ecosystem. Thank you very much!
> 
> Kind Regards,
> Niklas
> 
> [0] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-39+Flink+ML+pipeline+and+ML+libs
>  
> 
> [1] https://issues.apache.org/jira/browse/FLINK-12470 
> 
> [2] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-96%3A+Support+Python+ML+Pipeline+API
>  
> 
> 
> -- 
> Arvid Heise | Senior Java Developer
>  
> Follow us @VervericaData
> --
> Join Flink Forward  - The Apache Flink Conference
> Stream Processing | Event Driven | Real Time
> --
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji 
> (Toni) Cheng



smime.p7s
Description: S/MIME cryptographic signature


Re: Jdbc input format and system properties

2020-11-20 Thread Arvid Heise
All looks good and as it should be.

Can you do a remote debugging session to the tm once more and check
Boolean.getBoolean("com.mysql.disableAbandonedConnectionCleanup")

There is no magic involved in System properties in Flink. If the property
is set on the process, the configuration works. If it's in the project,
mysql can access it.

On Fri, Nov 20, 2020 at 10:46 AM Flavio Pompermaier 
wrote:

> I've just tested the following code in a java class and the property
> (-Dcom.mysql.cj.disableAbandonedConnectionCleanup=true)  is read correctly
> and the abandonedConnectionCleanupDisabled does not initialize
> the cleanupThreadExecutorService (that in my other test was causing a
> dynamic classloading memory leak):
>
>  try {
>   Class.forName(drivername);
>   Connection dbConn = DriverManager.getConnection(dbURL, username,
> password);
>   System.out.println("OK");
> } catch (SQLException se) {
>   throw new IllegalArgumentException("open() failed." +
> se.getMessage(), se);
> } catch (ClassNotFoundException cnfe) {
>   throw new IllegalArgumentException("JDBC-Class not found. - " +
> cnfe.getMessage(), cnfe);
> }
>
> This is the output of ps aux command (I can also notice that the
> taskmanager parameters are created using a space after the -D option that I
> don't know if it intended or not):
>
> flink23904 59.5  1.1 5488264 380396 pts/5  Sl   10:41   0:08 java
> -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458
> -XX:MaxMetaspaceSize=268435456
> -Dcom.mysql.disableAbandonedConnectionCleanup=true
> -Dlog.file=/opt/flink/flink-1.11.0/log/flink-flink-taskexecutor-0-flavio-ThinkPad-P50.log
> -Dlog4j.configuration=file:/opt/flink/flink-1.11.0/conf/log4j.properties
> -Dlog4j.configurationFile=file:/opt/flink/flink-1.11.0/conf/log4j.properties
> -Dlogback.configurationFile=file:/opt/flink/flink-1.11.0/conf/logback.xml
> -classpath
> /opt/flink/flink-1.11.0/lib/flink-csv-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-json-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/flink-1.11.0/lib/flink-table_2.12-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-table-blink_2.12-1.11.0.jar:/opt/flink/flink-1.11.0/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-api-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-core-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/flink-1.11.0/lib/mariadb-java-client-2.6.0.jar:/opt/flink/flink-1.11.0/lib/mssql-jdbc-7.4.1.jre11.jar:/opt/flink/flink-1.11.0/lib/mysql-connector-java-8.0.22.jar:/opt/flink/flink-1.11.0/lib/flink-dist_2.12-1.11.0.jar:::
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir
> /opt/flink/flink-1.11.0/conf -D
> taskmanager.memory.framework.off-heap.size=134217728b -D
> taskmanager.memory.network.max=134217730b -D
> taskmanager.memory.network.min=134217730b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=402653174b -D
> taskmanager.memory.task.off-heap.size=0b
>
> The mysql dependency in the maven project is:
>
> 
>mysql
>mysql-connector-java
>8.0.22
>provided
> 
>
> On Fri, Nov 20, 2020 at 10:07 AM Flavio Pompermaier 
> wrote:
>
>> no no I didn't relocate any class related to jdbc
>>
>> Il ven 20 nov 2020, 10:02 Arvid Heise  ha scritto:
>>
>>> I was particularly asking if you relocate classes. Since the property
>>> name looks like a class name, it could have been changed as well. Could you
>>> check the value of
>>> PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup in your final
>>> jar?
>>>
>>> On Fri, Nov 20, 2020 at 9:35 AM Flavio Pompermaier 
>>> wrote:
>>>
 the mysql connector is put in the client classpath and in the Flink lib
 dir. When i debugged remotely the AbandonedConnectionCleanupThread was
 initialized at the first run of the job by the taskmamager. Today I'll try
 to run the mysql connector in a standalone java app to see if the property
 is read correctly or not.

 Il ven 20 nov 2020, 07:52 Arvid Heise  ha scritto:

> Hi Flavio,
>
> if it arrives in the java process then you are doing everything right
> already (or almost).
>
> Are you shading the mysql connector? I'm suspecting that the property
> also get shaded then. You could decompile your jar to be sure. Have you
> verified that this is working as intended without Flink?
>
> On Thu, Nov 19, 2020 at 9:19 PM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> the properties arrives to the task manager because I can see them in
>> the java process (using ps aux)..or donyoubmean some special line of 
>> code?
>>
>> Il gio 19 nov 2020, 20:53 Arvid Heise  ha
>> scritto:
>>
>>> Hi Flavio,
>>>
>>> you are right, all looks good.
>>>
>>> Can you please verify if the properties arrived at the task

Re: Jdbc input format and system properties

2020-11-20 Thread Flavio Pompermaier
Yes, that's what is surprising..I already did a remote debug on the TM and
that property is not read..but that's really weird..could it be that the
JVM properties gets cleared before invoking the tasks?

Il ven 20 nov 2020, 12:50 Arvid Heise  ha scritto:

> All looks good and as it should be.
>
> Can you do a remote debugging session to the tm once more and check
> Boolean.getBoolean("com.mysql.disableAbandonedConnectionCleanup")
>
> There is no magic involved in System properties in Flink. If the property
> is set on the process, the configuration works. If it's in the project,
> mysql can access it.
>
> On Fri, Nov 20, 2020 at 10:46 AM Flavio Pompermaier 
> wrote:
>
>> I've just tested the following code in a java class and the property
>> (-Dcom.mysql.cj.disableAbandonedConnectionCleanup=true)  is read correctly
>> and the abandonedConnectionCleanupDisabled does not initialize
>> the cleanupThreadExecutorService (that in my other test was causing a
>> dynamic classloading memory leak):
>>
>>  try {
>>   Class.forName(drivername);
>>   Connection dbConn = DriverManager.getConnection(dbURL, username,
>> password);
>>   System.out.println("OK");
>> } catch (SQLException se) {
>>   throw new IllegalArgumentException("open() failed." +
>> se.getMessage(), se);
>> } catch (ClassNotFoundException cnfe) {
>>   throw new IllegalArgumentException("JDBC-Class not found. - " +
>> cnfe.getMessage(), cnfe);
>> }
>>
>> This is the output of ps aux command (I can also notice that the
>> taskmanager parameters are created using a space after the -D option that I
>> don't know if it intended or not):
>>
>> flink23904 59.5  1.1 5488264 380396 pts/5  Sl   10:41   0:08 java
>> -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458
>> -XX:MaxMetaspaceSize=268435456
>> -Dcom.mysql.disableAbandonedConnectionCleanup=true
>> -Dlog.file=/opt/flink/flink-1.11.0/log/flink-flink-taskexecutor-0-flavio-ThinkPad-P50.log
>> -Dlog4j.configuration=file:/opt/flink/flink-1.11.0/conf/log4j.properties
>> -Dlog4j.configurationFile=file:/opt/flink/flink-1.11.0/conf/log4j.properties
>> -Dlogback.configurationFile=file:/opt/flink/flink-1.11.0/conf/logback.xml
>> -classpath
>> /opt/flink/flink-1.11.0/lib/flink-csv-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-json-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/flink-1.11.0/lib/flink-table_2.12-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-table-blink_2.12-1.11.0.jar:/opt/flink/flink-1.11.0/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-api-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-core-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/flink-1.11.0/lib/mariadb-java-client-2.6.0.jar:/opt/flink/flink-1.11.0/lib/mssql-jdbc-7.4.1.jre11.jar:/opt/flink/flink-1.11.0/lib/mysql-connector-java-8.0.22.jar:/opt/flink/flink-1.11.0/lib/flink-dist_2.12-1.11.0.jar:::
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir
>> /opt/flink/flink-1.11.0/conf -D
>> taskmanager.memory.framework.off-heap.size=134217728b -D
>> taskmanager.memory.network.max=134217730b -D
>> taskmanager.memory.network.min=134217730b -D
>> taskmanager.memory.framework.heap.size=134217728b -D
>> taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D
>> taskmanager.memory.task.heap.size=402653174b -D
>> taskmanager.memory.task.off-heap.size=0b
>>
>> The mysql dependency in the maven project is:
>>
>> 
>>mysql
>>mysql-connector-java
>>8.0.22
>>provided
>> 
>>
>> On Fri, Nov 20, 2020 at 10:07 AM Flavio Pompermaier 
>> wrote:
>>
>>> no no I didn't relocate any class related to jdbc
>>>
>>> Il ven 20 nov 2020, 10:02 Arvid Heise  ha scritto:
>>>
 I was particularly asking if you relocate classes. Since the property
 name looks like a class name, it could have been changed as well. Could you
 check the value of
 PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup in your final
 jar?

 On Fri, Nov 20, 2020 at 9:35 AM Flavio Pompermaier <
 pomperma...@okkam.it> wrote:

> the mysql connector is put in the client classpath and in the Flink
> lib dir. When i debugged remotely the AbandonedConnectionCleanupThread
> was initialized at the first run of the job by the taskmamager. Today I'll
> try to run the mysql connector in a standalone java app to see if the
> property is read correctly or not.
>
> Il ven 20 nov 2020, 07:52 Arvid Heise  ha
> scritto:
>
>> Hi Flavio,
>>
>> if it arrives in the java process then you are doing everything right
>> already (or almost).
>>
>> Are you shading the mysql connector? I'm suspecting that the property
>> also get shaded then. You could decompile your jar to be sure. Have you
>> verified that this is working as intended without Flink?
>>
>> On Thu, Nov 19, 2020 at 9:19 PM Flavio Pompermaier <
>> pomperma...@okkam.it> wrote:

Re: Jdbc input format and system properties

2020-11-20 Thread Arvid Heise
No magic for JVM properties afaik (and I just looked in the code base for
the most obvious candidates). There is also nothing to gain from
overwriting properties.
I'm also certain that it should work as it's used in most secured setups to
inject keys/keytabs.

What happens if you execute the Flink program in a local executor or mini
cluster? Could you set a breakpoint to the static initializer of
AbandonedConnectionCleanupThread and check what's going on there?



On Fri, Nov 20, 2020 at 12:58 PM Flavio Pompermaier 
wrote:

> Yes, that's what is surprising..I already did a remote debug on the TM and
> that property is not read..but that's really weird..could it be that the
> JVM properties gets cleared before invoking the tasks?
>
> Il ven 20 nov 2020, 12:50 Arvid Heise  ha scritto:
>
>> All looks good and as it should be.
>>
>> Can you do a remote debugging session to the tm once more and check
>> Boolean.getBoolean("com.mysql.disableAbandonedConnectionCleanup")
>>
>> There is no magic involved in System properties in Flink. If the property
>> is set on the process, the configuration works. If it's in the project,
>> mysql can access it.
>>
>> On Fri, Nov 20, 2020 at 10:46 AM Flavio Pompermaier 
>> wrote:
>>
>>> I've just tested the following code in a java class and the property
>>> (-Dcom.mysql.cj.disableAbandonedConnectionCleanup=true)  is read correctly
>>> and the abandonedConnectionCleanupDisabled does not initialize
>>> the cleanupThreadExecutorService (that in my other test was causing a
>>> dynamic classloading memory leak):
>>>
>>>  try {
>>>   Class.forName(drivername);
>>>   Connection dbConn = DriverManager.getConnection(dbURL, username,
>>> password);
>>>   System.out.println("OK");
>>> } catch (SQLException se) {
>>>   throw new IllegalArgumentException("open() failed." +
>>> se.getMessage(), se);
>>> } catch (ClassNotFoundException cnfe) {
>>>   throw new IllegalArgumentException("JDBC-Class not found. - " +
>>> cnfe.getMessage(), cnfe);
>>> }
>>>
>>> This is the output of ps aux command (I can also notice that the
>>> taskmanager parameters are created using a space after the -D option that I
>>> don't know if it intended or not):
>>>
>>> flink23904 59.5  1.1 5488264 380396 pts/5  Sl   10:41   0:08 java
>>> -Xmx536870902 -Xms536870902 -XX:MaxDirectMemorySize=268435458
>>> -XX:MaxMetaspaceSize=268435456
>>> -Dcom.mysql.disableAbandonedConnectionCleanup=true
>>> -Dlog.file=/opt/flink/flink-1.11.0/log/flink-flink-taskexecutor-0-flavio-ThinkPad-P50.log
>>> -Dlog4j.configuration=file:/opt/flink/flink-1.11.0/conf/log4j.properties
>>> -Dlog4j.configurationFile=file:/opt/flink/flink-1.11.0/conf/log4j.properties
>>> -Dlogback.configurationFile=file:/opt/flink/flink-1.11.0/conf/logback.xml
>>> -classpath
>>> /opt/flink/flink-1.11.0/lib/flink-csv-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-json-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/flink-1.11.0/lib/flink-table_2.12-1.11.0.jar:/opt/flink/flink-1.11.0/lib/flink-table-blink_2.12-1.11.0.jar:/opt/flink/flink-1.11.0/lib/log4j-1.2-api-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-api-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-core-2.12.1.jar:/opt/flink/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar:/opt/flink/flink-1.11.0/lib/mariadb-java-client-2.6.0.jar:/opt/flink/flink-1.11.0/lib/mssql-jdbc-7.4.1.jre11.jar:/opt/flink/flink-1.11.0/lib/mysql-connector-java-8.0.22.jar:/opt/flink/flink-1.11.0/lib/flink-dist_2.12-1.11.0.jar:::
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner --configDir
>>> /opt/flink/flink-1.11.0/conf -D
>>> taskmanager.memory.framework.off-heap.size=134217728b -D
>>> taskmanager.memory.network.max=134217730b -D
>>> taskmanager.memory.network.min=134217730b -D
>>> taskmanager.memory.framework.heap.size=134217728b -D
>>> taskmanager.memory.managed.size=536870920b -D taskmanager.cpu.cores=1.0 -D
>>> taskmanager.memory.task.heap.size=402653174b -D
>>> taskmanager.memory.task.off-heap.size=0b
>>>
>>> The mysql dependency in the maven project is:
>>>
>>> 
>>>mysql
>>>mysql-connector-java
>>>8.0.22
>>>provided
>>> 
>>>
>>> On Fri, Nov 20, 2020 at 10:07 AM Flavio Pompermaier <
>>> pomperma...@okkam.it> wrote:
>>>
 no no I didn't relocate any class related to jdbc

 Il ven 20 nov 2020, 10:02 Arvid Heise  ha scritto:

> I was particularly asking if you relocate classes. Since the property
> name looks like a class name, it could have been changed as well. Could 
> you
> check the value of
> PropertyDefinitions.SYSP_disableAbandonedConnectionCleanup in your final
> jar?
>
> On Fri, Nov 20, 2020 at 9:35 AM Flavio Pompermaier <
> pomperma...@okkam.it> wrote:
>
>> the mysql connector is put in the client classpath and in the Flink
>> lib dir. When i debugged remotely the AbandonedConnectionCleanupThread
>> was initialized at the first run of the job by the taskmamager. Today 
>>>

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Si-li Liu
Thanks for your reply.

I want to join two stream A and stream B. Items in stream A come in first
then I keep them in memory cache, as join key and item, then serval
minutes later the items in stream B come in then the join work is
performed. The timestamp of the latest expired item in memory cache is the
safe rollback timestamp, I can resume source A from that timestamp when I
restart.

It's not very percise, maybe lost same items or send same items twice, but
seems useful to me in my situation. But if job restart, both source A and
source B resume from last consumed offset, it will make the absense of
serval minutes join result, which is unacceptable.

The topo I consider is like

source A -> parser --shuffle--> join -> sink
source B -> parser ...(parallel)  |--->timestampcalculator

Memory cache aside in join operator, the join operator will broadcast the
timestamp of latest expired cache item to the timestampcalculator. Then
timestampcalculator will use them to calculate a safe rollback timestamp (a
moving minimum) that source A can resume from that timestamp, source B will
also restart from that timestamp. I will add a bloomfilter in sink's state
to avoid duplicate items.

So I want to let timestampcalculator operator and source A are located in
one TM, then I can send this timestamp from timestampcalculator to source A
by static variable.

Hope I make my problem clear with my poor English, it seems a little
tricky. But I think it's the only way to do two streams join and avoid to
store very huge state.



Arvid Heise  于2020年11月20日周五 下午2:58写道:

> I still haven't fully understood. Do you mean you can't infer the
> timestamp in source A because it depends on some internal field of source B?
>
> How is that actually working in a parallel setting? Which timestamp is
> used in the different instances of a source?
>
> Say, we have task A1 which is the first subtask of source A and task B2 as
> the second subtask of source B. How would you like them to be located? How
> does that correlate to the third subtask of the join (let's call it J3).
>
> Remember that through the shuffling before the join there is no clear
> correlation between any subtask of A or B to J...
>
> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu  wrote:
>
>> Thanks for your help!
>>
>> Now the timestamps already go with the items in streaming. My streaming
>> pipeline is like this:
>>
>> source -> parser --shuffle--> join -> sink
>>
>> Streaming A and streaming B go through this pipeline, I keep logs in
>> streaming A in memory cache (linkedHashmap) in join operator, then all logs
>> in streaming B tries to lookup up the cache and perform the actual join
>> work.
>>
>> I try to use the timestamp of the lastest expire item in memory as a safe
>> rollback timestamp, if I restart job, the source should use this timestamp
>> as start offset. The safe rollback timestamp is calucated in join operator,
>> but I want to use it in source. So the simplest way to pass this
>> information from join operator to source is use static variable, which
>> require source operator and join operator always locate in same TM process.
>>
>> Arvid Heise  于2020年11月20日周五 上午3:33写道:
>>
>>> Hi Si-li,
>>>
>>> couldn't you also add the timestamp as a state to the source? So the
>>> time would store the timestamp of the last emitted record.
>>> It's nearly identical to your solution but would fit the recovery model
>>> of Flink much better.
>>> If you want to go further back to account for the records that have been
>>> actually processed in the join, you could also replay the data from >> timestamp> - .
>>>
>>> On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu  wrote:
>>>
 Thanks, I'll try it.

 Matthias Pohl  于2020年11月14日周六 上午12:53写道:

> Hi Si-li,
> trying to answer your initial question: Theoretically, you could try
> using the co-location constraints to achieve this. But keep in mind that
> this might lead to multiple Join operators running in the same JVM 
> reducing
> the amount of memory each operator can utilize.
>
> Best,
> Matthias
>
> On Mon, Nov 9, 2020 at 4:23 AM Si-li Liu  wrote:
>
>> Thanks for your reply.
>>
>> It's a streaming job. The join operator is doing join work, such as
>> join. The join state is too large so I don't want to keep the state using
>> the mechanism that Flink provided, and also I don't need very precise 
>> join.
>> So I prefer to let the join operator to calculate a backward timestamp as
>> state, if the cluster restarts, the consumer can use 
>> setStartFromTimestamp
>> to start from that timestamp.
>>
>> Now my problem is, consumer can't read the state that join operator
>> written, so I need a way to need small message (64bit long) from 
>> downstream
>> to upstream. Redis may be a solution, but add external  dependency is a
>> secondary option if I can pass this message through memory.
>>
>>
>> Ches

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread Timo Walther

Hi Fuyao,

sorry for not replying earlier.

You posted a lot of questions. I scanned the thread quickly, let me try 
to answer some of them and feel free to ask further questions afterwards.


"is it possible to configure the parallelism for Table operation at 
operator level"


No this is not possible at the moment. The reason is 1) we don't know 
how to expose such a functionality in a nice way. Maybe we will use SQL 
hints in the future [1]. 2) Sometime the planner sets the paralellism of 
operators explicitly to 1. All other operators will use the globally 
defined parallelism for the pipeline (also to not mess up retraction 
messages internally). You will be able to set the parallelism of the 
sink operation in Flink 1.12.


"BoundedOutOfOrderness Watermark Generator is NOT making the event time 
to advance"


Have you checked if you can use an interval join instead of a full join 
with state retention? Table/SQL pipelines that don't preserve a time 
attribute in the end might also erase the underlying watermarks. Thus, 
event time triggers will not work after your join.


"Why can't I update the watermarks for all 8 parallelisms?"

You could play around with idleness for your source [2]. Or you set the 
source parallelism to 1 (while keeping the rest of the pipeline globally 
set to 8), would that be an option?


"Some type cast behavior of retracted streams I can't explain."

toAppendStream/toRetractStream still need an update to the new type 
system. This is explained in FLIP-136 which will be part of Flink 1.13 [3].


I hope I could help a bit.

Regards,
Timo


[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-113%3A+Supports+Dynamic+Table+Options+for+Flink+SQL
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#dealing-with-idle-sources
[3] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-136%3A++Improve+interoperability+between+DataStream+and+Table+API


On 13.11.20 21:39, Fuyao Li wrote:

Hi Matthias,

Just to provide more context on this problem. I only have 1 partition 
per each Kafka Topic at the beginning before the join operation. After 
reading the doc: 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-and-timestamp-extractionwatermark-emission 



Maybe that is the root cause of my problem here, with less than 8 
partitions (only 1 partition in my case), using the default 
parallelism of 8 will cause this wrong behavior. This is my guess, it 
takes a while to test it out... What's your opinion on this? Thanks!


Best,

Fuyao


On Fri, Nov 13, 2020 at 11:57 AM Fuyao Li > wrote:


Hi Matthias,

One more question regarding Flink table parallelism, is it possible
to configure the parallelism for Table operation at operator level,
it seems we don't have such API available, right? Thanks!

Best,
Fuyao

On Fri, Nov 13, 2020 at 11:48 AM Fuyao Li mailto:fuyaoli2...@gmail.com>> wrote:

Hi Matthias,

Thanks for your information. I have managed to figure out the
first issue you mentioned. Regarding the second issue. I have
got some progress on it.

I have sent another email with the title 'BoundedOutOfOrderness
Watermark Generator is NOT making the event time to advance'
using another email of mine, fuyao...@oracle.com
. That email contains some more
context on my issue. Please take a look. I have made some
progress after sending that new email.

Previously, I had managed to make timelag watermark strategy
working in my code, but my bound out of orderness strategy or
punctuated watermark strategy doesn't work well. It produces 8
watermarks each time. Two cycles are shown below.

I managed to figure out the root cause is that Flink stream
execution environment has a default parallelism as 8.*I didn't
notice in the doc, could the Community add this explicitly into
the official doc to avoid some confusion? Thanks.*

 From my understanding, the watermark advances based on the
lowest watermark among the 8, so I can not advance the bound out
of orderness watermark since I am only advancing 1 of the 8
parallelisms. If I set the entire stream execution environment
to be of parallelism 1, it will reflect the watermark in the
context correctly. One more thing is that this behavior is not
reflected in the Flink Cluster web UI interface. I can see the
watermark is advancing, but it is not in reality. *That's
causing the inconsistency problem I mentioned in the other email
I mentioned above. Will this be considered as a bug in the UI?*

My current quest

Non uniform distribution of subtasks even with cluster.evenly-spread-out-slots

2020-11-20 Thread Harshit Hajela
Hi Flink Community,

I'm currently running a heavy flink job on Flink 1.9.3 that has a lot of
subtasks and observing some subtask distribution issues. The job in
question has 9288 sub tasks and they are running on a large set of TMs
(total available slots are 1792).

I'm using the *cluster.evenly-spread-out-slots* configuration option to
have the slots be allocated uniformly but I am still seeing non-uniform
subtask distribution that seems to be affecting performance. It looks like
some TMs are being overloaded and seeing a much greater than uniform
allocation of subtasks. I've been trying to reproduce this situation at a
smaller scale but have not been successful in doing so.

As part of debugging the scheduling process when trying to reproduce this
at a smaller scale I observed that the non-location preference
selectWithoutLocationPreference override introduced by the evenly spread
out strategy option is not being invoked at all as the execution vertices
still have a location preference to be assigned the same slots as their
input vertices.

This happens at job startup time and not during recovery, so I'm not sure
if recovery is where the non preference code path is invoked. In essence
the only impact of using the evenly spread out strategy seems to be a
slightly different candidate score calculation.

I wanted to know:-
1. Is the evenly spread out strategy the right option to choose for
achieving the uniform distribution of subtasks?
2. Is the observed scheduling behaviour expected for the evenly spread out
strategy? When do we expect the non location preference code path to be
invoked? For us this only happens on sources since they have no incoming
edges.

Apart from this I am still trying to understand the nature of scheduling in
Flink and how that could bring about this situation, I was wondering if
there were known issues or peculiarities of the Flink job scheduler that
could lead to this situation occurring. For example I'm looking at the
known issues mentioned in the ticket
https://issues.apache.org/jira/browse/FLINK-11815
.

I was hoping to understand :-
1. The conditions that would give rise to these kinds of situations or how
to verify if we are running into them. For example, how to verify that key
group allocation is non-uniform
2. If these issues have been addressed in subsequent versions of flink
3. If there is any other information about the nature of scheduling jobs in
flink that could give rise to the non-uniform distribution observed.

Please let me know if further information needs to be provided.

Thanks,
Harshit


Re: Dynamic ad hoc query deployment strategy

2020-11-20 Thread lalala
Hi Kostas,

Thank you for your response.

Is what you are saying valid for session mode? I can submit my jobs to the
existing Flink session, will they be able to share the sources?

We do register our Kafka tables to `GenericInMemoryCatalog`, and the
documentation says `The GenericInMemoryCatalog is an in-memory
implementation of a catalog. All objects will be available only for the
lifetime of the session.`. I presume, in session mode, we can share Kafka
source for multiple SQL jobs?

That is not want we wanted for the best isolation, but if it is not possible
with Flink, we are also good with session mode.

Best regards,





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
Hi,
I added my custom jar (that includes dependencies on Jackson) to Flink
classpath. It seems to be loaded just fine. But when the job starts I am
getting an exception below. I am sure how to interpret the exception though
and would appreciate it if somebody gives me advice on it.
Thanks
Alex

2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task [] -
Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1)
(dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig
.getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2
.jar:1.11.2]
at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2.jar:
1.11.2]
at org.apache.flink.streaming.runtime.tasks.OperatorChain
.createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2.jar:
1.11.2]
at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
[flink-dist_2.11-1.11.2.jar:1.11.2]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
Caused by: java.lang.ClassCastException: cannot assign instance of
java.util.concurrent.ConcurrentHashMap to field
com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers
of type com.fasterxml.jackson.databind.util.LRUMap in instance of
com.fasterxml.jackson.databind.deser.DeserializerCache
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
ObjectStreamClass.java:2301) ~[?:1.8.0_265]
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:
1431) ~[?:1.8.0_265]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2372) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
~[?:1.8.0_265]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2148) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
~[?:1.8.0_265]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2366) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
~[?:1.8.0_265]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2148) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
~[?:1.8.0_265]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2366) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
~[?:1.8.0_265]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2148) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
~[?:1.8.0_265]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2366) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
~[?:1.8.0_265]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2148) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
~[?:1.8.0_265]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2366) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
~[?:1.8.0_265]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2148) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
~[?:1.8.0_265]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2366) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
~[?:1.8.0_265]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2148) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
~[?:1.8.0_265]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
2366) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2290)
~[?:1.8.0_265]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:
2148) ~[?:1.8.0_265]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
~[?:1.8.0_265]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:483) ~[?:
1.8.0_265]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:441) ~[?:
1.8.

Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Arvid Heise
Your topology is definitively interesting and makes sense to me on a high
level. The main question remaining is the parallelism. I'm assuming you run
your pipeline with parallelism p and both source A and timestampcalculator
T are run with parallelism p. You want to create a situation where for A_i,
there is an T_i which run in the same slot. Am I right?

If so, then as you have noticed that there is currently no way to express
that in Flink on a high level. One more idea before trying to solve it in a
hacky way: How large is B? Could use a broadcast to avoid the shuffle on A?
I'm thinking of creating a pipeline A->J(side input B)->T, because then
it's easy to produce an operator chain, where everything even runs within
the same thread.

On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:

> Thanks for your reply.
>
> I want to join two stream A and stream B. Items in stream A come in first
> then I keep them in memory cache, as join key and item, then serval
> minutes later the items in stream B come in then the join work is
> performed. The timestamp of the latest expired item in memory cache is the
> safe rollback timestamp, I can resume source A from that timestamp when I
> restart.
>
> It's not very percise, maybe lost same items or send same items twice, but
> seems useful to me in my situation. But if job restart, both source A and
> source B resume from last consumed offset, it will make the absense of
> serval minutes join result, which is unacceptable.
>
> The topo I consider is like
>
> source A -> parser --shuffle--> join -> sink
> source B -> parser ...(parallel)  |--->timestampcalculator
>
> Memory cache aside in join operator, the join operator will broadcast the
> timestamp of latest expired cache item to the timestampcalculator. Then
> timestampcalculator will use them to calculate a safe rollback timestamp (a
> moving minimum) that source A can resume from that timestamp, source B will
> also restart from that timestamp. I will add a bloomfilter in sink's state
> to avoid duplicate items.
>
> So I want to let timestampcalculator operator and source A are located in
> one TM, then I can send this timestamp from timestampcalculator to source A
> by static variable.
>
> Hope I make my problem clear with my poor English, it seems a little
> tricky. But I think it's the only way to do two streams join and avoid to
> store very huge state.
>
>
>
> Arvid Heise  于2020年11月20日周五 下午2:58写道:
>
>> I still haven't fully understood. Do you mean you can't infer the
>> timestamp in source A because it depends on some internal field of source B?
>>
>> How is that actually working in a parallel setting? Which timestamp is
>> used in the different instances of a source?
>>
>> Say, we have task A1 which is the first subtask of source A and task B2
>> as the second subtask of source B. How would you like them to be located?
>> How does that correlate to the third subtask of the join (let's call it J3).
>>
>> Remember that through the shuffling before the join there is no clear
>> correlation between any subtask of A or B to J...
>>
>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu  wrote:
>>
>>> Thanks for your help!
>>>
>>> Now the timestamps already go with the items in streaming. My streaming
>>> pipeline is like this:
>>>
>>> source -> parser --shuffle--> join -> sink
>>>
>>> Streaming A and streaming B go through this pipeline, I keep logs in
>>> streaming A in memory cache (linkedHashmap) in join operator, then all logs
>>> in streaming B tries to lookup up the cache and perform the actual join
>>> work.
>>>
>>> I try to use the timestamp of the lastest expire item in memory as a
>>> safe rollback timestamp, if I restart job, the source should use this
>>> timestamp as start offset. The safe rollback timestamp is calucated in join
>>> operator, but I want to use it in source. So the simplest way to pass this
>>> information from join operator to source is use static variable, which
>>> require source operator and join operator always locate in same TM process.
>>>
>>> Arvid Heise  于2020年11月20日周五 上午3:33写道:
>>>
 Hi Si-li,

 couldn't you also add the timestamp as a state to the source? So the
 time would store the timestamp of the last emitted record.
 It's nearly identical to your solution but would fit the recovery model
 of Flink much better.
 If you want to go further back to account for the records that have
 been actually processed in the join, you could also replay the data from
  - .

 On Mon, Nov 16, 2020 at 8:39 AM Si-li Liu  wrote:

> Thanks, I'll try it.
>
> Matthias Pohl  于2020年11月14日周六 上午12:53写道:
>
>> Hi Si-li,
>> trying to answer your initial question: Theoretically, you could try
>> using the co-location constraints to achieve this. But keep in mind that
>> this might lead to multiple Join operators running in the same JVM 
>> reducing
>> the amount of memory each operator can utilize.
>>
>> Best,
>>>

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread fuyao . li

Hi Timo,

Thanks for your reply! I think your suggestions is really helpful! The 
good news is that I had managed to figure out it something by myself few 
days ago.


1. Thanks for the update about the table parallelism issue!

2. After trying out the idleness setting. It prevents some idle subtasks 
from blocking the pipeline's overall watermark and it works for me. 
Based on my observation and reading the source code, I have summarized 
some notes. Please correct me if I am wrong.


1. (1)Watermark is independent within each subtask for an Flink operator.
2. (2)The watermark of the multi-parallelism table operator is always
   dominated by least watermark of the current*ACTIVE*subtasks.
3. (3)With withIdleness() configured. A subtask will be mark as idle if
   it hasn’t receive message for configured period of time. It will NOT
   execute onPeriodEmit() and emit watermark after reaching the idle
   state. Between [the start of the application/receive a new message] 
   and [reaching into the idle state], the onPeriodEmit() will still
   emit watermark and dominate the overall context watermark if it
   holds the smallest watermark among the subtasks.
4. (4)Once an idle subtask receive a new message, it will switch its
   status from idle to active and start to influence the overall
   context watermark.

3. In order to route the correct information to the subtask in the join 
step, I have added the keyed() logic in the source based on the join key 
in the join step. It seems to work correctly and could route the message 
to a current place.


4. For the interval join, I think I can't use it directly since I need 
to use full outer join to not lose any information from any upstream 
datastream. I think interval join is a inner join it can't do this task. 
I guess my only option is to do full outer join with query configuration.


5. One more question about the data replay issue. I read the ververica 
blog 
(https://www.ververica.com/blog/replayable-process-functions-time-ordering-and-timers) 
and I think with replay use case, we will face some similar issues. I 
think the suggested approach mentioned


  (1). Puts each incoming track record in a map keyed by its timestamp

  (2). creates an event timer to process that record once the watermark 
hits that point.


I kind of understand the idea here. Buffer all the data(maybe delete 
some of the old track if processed) in a track ordered by timestamp and 
trigger the event timer sequentially with this buffered track.


Based on my understanding, this buffered design is only suitable for 
*offline* data processing, right? (It is a waste of resource to buffer 
this in real time. )


Also, from the article, I think they are using periodic watermark 
strategy[1]. how can they process the last piece of data records with 
periodic watermark strategy since there is no more incoming data to 
advance the watermark? So the last piece of data will never be processed 
here? Is there a way to gracefully handle this? My use case doesn't 
allow me to lose any information.



[1]https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator

Best,

Fuyao


On 11/20/20 08:55, Timo Walther wrote:

Hi Fuyao,

sorry for not replying earlier.

You posted a lot of questions. I scanned the thread quickly, let me 
try to answer some of them and feel free to ask further questions 
afterwards.


"is it possible to configure the parallelism for Table operation at 
operator level"


No this is not possible at the moment. The reason is 1) we don't know 
how to expose such a functionality in a nice way. Maybe we will use 
SQL hints in the future [1]. 2) Sometime the planner sets the 
paralellism of operators explicitly to 1. All other operators will use 
the globally defined parallelism for the pipeline (also to not mess up 
retraction messages internally). You will be able to set the 
parallelism of the sink operation in Flink 1.12.


"BoundedOutOfOrderness Watermark Generator is NOT making the event 
time to advance"


Have you checked if you can use an interval join instead of a full 
join with state retention? Table/SQL pipelines that don't preserve a 
time attribute in the end might also erase the underlying watermarks. 
Thus, event time triggers will not work after your join.


"Why can't I update the watermarks for all 8 parallelisms?"

You could play around with idleness for your source [2]. Or you set 
the source parallelism to 1 (while keeping the rest of the pipeline 
globally set to 8), would that be an option?


"Some type cast behavior of retracted streams I can't explain."

toAppendStream/toRetractStream still need an update to the new type 
system. This is explained in FLIP-136 which will be part of Flink 1.13 
[3].


I hope I could help a bit.

Regards,
Timo


[1] 
https://urldefense.com/v3/__https://cwiki.apache.org/confluence/display/FLINK/FLIP-113*3A*Supports*Dynamic*Table*Options*for*Flink*SQL__;JS

Re: Questions on Table to Datastream conversion and onTimer method in KeyedProcessFunction

2020-11-20 Thread fuyao . li

Hi Timo,

One more question, the blog also mentioned a jira task to solve this 
issue. https://issues.apache.org/jira/browse/FLINK-10886. Will this 
feature be available in 1.12? Thanks!


Best,

Fuyao

On 11/20/20 11:37, fuyao...@oracle.com wrote:


Hi Timo,

Thanks for your reply! I think your suggestions is really helpful! The 
good news is that I had managed to figure out it something by myself 
few days ago.


1. Thanks for the update about the table parallelism issue!

2. After trying out the idleness setting. It prevents some idle 
subtasks from blocking the pipeline's overall watermark and it works 
for me. Based on my observation and reading the source code, I have 
summarized some notes. Please correct me if I am wrong.


 1. (1)Watermark is independent within each subtask for an Flink operator.
 2. (2)The watermark of the multi-parallelism table operator is always
dominated by least watermark of the current*ACTIVE*subtasks.
 3. (3)With withIdleness() configured. A subtask will be mark as idle
if it hasn’t receive message for configured period of time. It
will NOT execute onPeriodEmit() and emit watermark after reaching
the idle state. Between [the start of the application/receive a
new message]  and [reaching into the idle state], the
onPeriodEmit() will still emit watermark and dominate the overall
context watermark if it holds the smallest watermark among the
subtasks.
 4. (4)Once an idle subtask receive a new message, it will switch its
status from idle to active and start to influence the overall
context watermark.

3. In order to route the correct information to the subtask in the 
join step, I have added the keyed() logic in the source based on the 
join key in the join step. It seems to work correctly and could route 
the message to a current place.


4. For the interval join, I think I can't use it directly since I need 
to use full outer join to not lose any information from any upstream 
datastream. I think interval join is a inner join it can't do this 
task. I guess my only option is to do full outer join with query 
configuration.


5. One more question about the data replay issue. I read the ververica 
blog 
(https://www.ververica.com/blog/replayable-process-functions-time-ordering-and-timers) 
and I think with replay use case, we will face some similar issues. I 
think the suggested approach mentioned


  (1). Puts each incoming track record in a map keyed by its timestamp

  (2). creates an event timer to process that record once the 
watermark hits that point.


I kind of understand the idea here. Buffer all the data(maybe delete 
some of the old track if processed) in a track ordered by timestamp 
and trigger the event timer sequentially with this buffered track.


Based on my understanding, this buffered design is only suitable for 
*offline* data processing, right? (It is a waste of resource to buffer 
this in real time. )


Also, from the article, I think they are using periodic watermark 
strategy[1]. how can they process the last piece of data records with 
periodic watermark strategy since there is no more incoming data to 
advance the watermark? So the last piece of data will never be 
processed here? Is there a way to gracefully handle this? My use case 
doesn't allow me to lose any information.



[1]https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#writing-a-periodic-watermarkgenerator

Best,

Fuyao


On 11/20/20 08:55, Timo Walther wrote:

Hi Fuyao,

sorry for not replying earlier.

You posted a lot of questions. I scanned the thread quickly, let me 
try to answer some of them and feel free to ask further questions 
afterwards.


"is it possible to configure the parallelism for Table operation at 
operator level"


No this is not possible at the moment. The reason is 1) we don't know 
how to expose such a functionality in a nice way. Maybe we will use 
SQL hints in the future [1]. 2) Sometime the planner sets the 
paralellism of operators explicitly to 1. All other operators will 
use the globally defined parallelism for the pipeline (also to not 
mess up retraction messages internally). You will be able to set the 
parallelism of the sink operation in Flink 1.12.


"BoundedOutOfOrderness Watermark Generator is NOT making the event 
time to advance"


Have you checked if you can use an interval join instead of a full 
join with state retention? Table/SQL pipelines that don't preserve a 
time attribute in the end might also erase the underlying watermarks. 
Thus, event time triggers will not work after your join.


"Why can't I update the watermarks for all 8 parallelisms?"

You could play around with idleness for your source [2]. Or you set 
the source parallelism to 1 (while keeping the rest of the pipeline 
globally set to 8), would that be an option?


"Some type cast behavior of retracted streams I can't explain."

toAppendStream/toRetractStream still need an update to the new type

Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
Are you using ObjectMapper as a non-transient field? If so, please make it
transient and initialize in open() of a Rich*Function.

On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman 
wrote:

> Hi,
> I added my custom jar (that includes dependencies on Jackson) to Flink
> classpath. It seems to be loaded just fine. But when the job starts I am
> getting an exception below. I am sure how to interpret the exception though
> and would appreciate it if somebody gives me advice on it.
> Thanks
> Alex
>
> 2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task []
> - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1)
> (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at org.apache.flink.streaming.api.graph.StreamConfig
> .getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2
> .jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2
> .jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2
> .jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
> OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
> StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: java.lang.ClassCastException: cannot assign instance of
> java.util.concurrent.ConcurrentHashMap to field
> com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers
> of type com.fasterxml.jackson.databind.util.LRUMap in instance of
> com.fasterxml.jackson.databind.deser.DeserializerCache
> at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> ObjectStreamClass.java:2301) ~[?:1.8.0_265]
> at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:
> 1431) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2372) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2290) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2148) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
> ~[?:1.8.0_265]
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2366) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2290) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2148) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
> ~[?:1.8.0_265]
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2366) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2290) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2148) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
> ~[?:1.8.0_265]
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2366) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2290) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2148) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
> ~[?:1.8.0_265]
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2366) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2290) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2148) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
> ~[?:1.8.0_265]
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2366) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
> 2290) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2148) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
> ~[?:1.8.0_265]
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:
> 2366) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream

Re: Filter Null in Array in SQL Connector

2020-11-20 Thread Rex Fenley
Btw, this is what our source and sink essentially look like, with some
columns redacted.

CREATE TABLE source_kafka_data (
id BIGINT,
roles ARRAY,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'topic',
'properties.bootstrap.servers' = 'kafka',
'properties.group.id' = 'group_id',
'properties.auto.offset.reset' = 'earliest',
'debezium-json.schema-include' = 'true',
'format' = 'debezium-json'
)


CREATE TABLE sink_es_data (
id BIGINT NOT NULL,
roles ARRAY,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'eshost',
'index' = 'data',
'format' = 'json',
'sink.bulk-flush.max-actions' = '8192',
'sink.bulk-flush.max-size' = '16mb',
'sink.bulk-flush.interval' = '5000',
'sink.bulk-flush.backoff.delay' = '1000',
'sink.bulk-flush.backoff.max-retries' = '4',
'sink.bulk-flush.backoff.strategy' = 'CONSTANT'
)



On Thu, Nov 19, 2020 at 7:41 PM Rex Fenley  wrote:

> Thanks!
>
> Update: We've confirmed with a test copy of our data now that if we remove
> all the null values from arrays everything works smoothly and as expected.
> So this definitely appears to be the culprit.
>
> On Thu, Nov 19, 2020 at 6:41 PM Jark Wu  wrote:
>
>> Thanks Rex! This is very helpful. Will check it out later.
>>
>>
>> On Fri, 20 Nov 2020 at 03:02, Rex Fenley  wrote:
>>
>>> Below is a highly redacted set of data that should represent the
>>> problem. As you can see, the "roles" field has "[null]" in it, a null value
>>> within the array. We also see in our DB corresponding rows like the
>>> following.
>>> id | roles
>>> ---+
>>>   16867433 | {NULL}
>>>
>>> We have confirmed that by not selecting "roles" all data passes through
>>> without failure on a single operator, but selecting "roles" will eventually
>>> always fail with java.lang.NullPointerException repeatedly. What is odd
>>> about this is there is 0 additional stack trace, just the exception, in our
>>> logs and in Flink UI. We only have INFO logging on, however, other
>>> exceptions we've encountered in our development have always revealed a
>>> stack trace.
>>>
>>> {
>>>   "schema": {
>>> "type": "struct",
>>> "fields": [
>>>   {
>>> "type": "struct",
>>> "fields": [
>>>   { "type": "int32", "optional": false, "field": "id" },
>>>   {
>>> "type": "array",
>>> "items": { "type": "string", "optional": true },
>>> "optional": false,
>>> "field": "roles"
>>>   },
>>> ],
>>> "optional": true,
>>> "name": "db.public.data.Value",
>>> "field": "before"
>>>   },
>>>   {
>>> "type": "struct",
>>> "fields": [
>>>   { "type": "int32", "optional": false, "field": "id" },
>>>   {
>>> "type": "array",
>>> "items": { "type": "string", "optional": true },
>>> "optional": false,
>>> "field": "roles"
>>>   },
>>> ],
>>> "optional": true,
>>> "name": "db.public.data.Value",
>>> "field": "after"
>>>   },
>>>   {
>>> "type": "struct",
>>> "fields": [
>>>   { "type": "string", "optional": false, "field": "version" },
>>>   { "type": "string", "optional": false, "field": "connector" },
>>>   { "type": "string", "optional": false, "field": "name" },
>>>   { "type": "int64", "optional": false, "field": "ts_ms" },
>>>   {
>>> "type": "string",
>>> "optional": true,
>>> "name": "io.debezium.data.Enum",
>>> "version": 1,
>>> "parameters": { "allowed": "true,last,false" },
>>> "default": "false",
>>> "field": "snapshot"
>>>   },
>>>   { "type": "string", "optional": false, "field": "db" },
>>>   { "type": "string", "optional": false, "field": "schema" },
>>>   { "type": "string", "optional": false, "field": "table" },
>>>   { "type": "int64", "optional": true, "field": "txId" },
>>>   { "type": "int64", "optional": true, "field": "lsn" },
>>>   { "type": "int64", "optional": true, "field": "xmin" }
>>> ],
>>> "optional": false,
>>> "name": "io.debezium.connector.postgresql.Source",
>>> "field": "source"
>>>   },
>>>   { "type": "string", "optional": false, "field": "op" },
>>>   { "type": "int64", "optional": true, "field": "ts_ms" },
>>>   {
>>> "type": "struct",
>>> "fields": [
>>>   { "type": "string", "optional": false, "field": "id" },
>>>   { "type": "int64", "optional": false, "field": "total_order" },
>>>   {
>>> "type": "int64",
>>> "optional": false,
>>> "field": "data_collection_order"
>>>   }
>>> ],
>>> "optional": true,
>>>  

Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
The easiest solution for all non-trivial issues like this is to start the
application locally in a local executor, so you can debug in your IDE.

Additionally, double-check that you have no lambdas/anonymous classes that
reference outer classes with ObjectMapper. ObjectMapper should also be
static as it's fully immutable, so you can also check that.

On Fri, Nov 20, 2020 at 8:55 PM Alexander Bagerman 
wrote:

> Thanks, Arvid,
> That is what I thought too. I went through all the instances where it
> might 've been a member variable and made sure that it's declared as
> transient :-( Is there anything else I can check?
> Alex
>
> On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise  wrote:
>
>> Are you using ObjectMapper as a non-transient field? If so, please make
>> it transient and initialize in open() of a Rich*Function.
>>
>> On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman 
>> wrote:
>>
>>> Hi,
>>> I added my custom jar (that includes dependencies on Jackson) to Flink
>>> classpath. It seems to be loaded just fine. But when the job starts I am
>>> getting an exception below. I am sure how to interpret the exception though
>>> and would appreciate it if somebody gives me advice on it.
>>> Thanks
>>> Alex
>>>
>>> 2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task
>>> [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1)
>>> (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
>>> instantiate user function.
>>> at org.apache.flink.streaming.api.graph.StreamConfig
>>> .getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.
>>> 2.jar:1.11.2]
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2
>>> .jar:1.11.2]
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2
>>> .jar:1.11.2]
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
>>> OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>>> StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
>>> Caused by: java.lang.ClassCastException: cannot assign instance of
>>> java.util.concurrent.ConcurrentHashMap to field
>>> com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers
>>> of type com.fasterxml.jackson.databind.util.LRUMap in instance of
>>> com.fasterxml.jackson.databind.deser.DeserializerCache
>>> at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
>>> ObjectStreamClass.java:2301) ~[?:1.8.0_265]
>>> at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass
>>> .java:1431) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>> .java:2372) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>> 2290) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2148) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
>>> ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>> .java:2366) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>> 2290) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2148) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
>>> ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>> .java:2366) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>> 2290) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2148) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
>>> ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>> .java:2366) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>>> 2290) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>>> .java:2148) ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
>>> ~[?:1.8.0_265]
>>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>>> .java:2366) ~[?:1

Re: Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
Thanks, Arvid,
That is what I thought too. I went through all the instances where it might
've been a member variable and made sure that it's declared as transient
:-( Is there anything else I can check?
Alex

On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise  wrote:

> Are you using ObjectMapper as a non-transient field? If so, please make it
> transient and initialize in open() of a Rich*Function.
>
> On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman 
> wrote:
>
>> Hi,
>> I added my custom jar (that includes dependencies on Jackson) to Flink
>> classpath. It seems to be loaded just fine. But when the job starts I am
>> getting an exception below. I am sure how to interpret the exception though
>> and would appreciate it if somebody gives me advice on it.
>> Thanks
>> Alex
>>
>> 2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task
>> [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/1)
>> (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
>> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
>> instantiate user function.
>> at org.apache.flink.streaming.api.graph.StreamConfig
>> .getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11.2
>> .jar:1.11.2]
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2
>> .jar:1.11.2]
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2
>> .jar:1.11.2]
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
>> OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(
>> StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
>> Caused by: java.lang.ClassCastException: cannot assign instance of
>> java.util.concurrent.ConcurrentHashMap to field
>> com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers
>> of type com.fasterxml.jackson.databind.util.LRUMap in instance of
>> com.fasterxml.jackson.databind.deser.DeserializerCache
>> at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
>> ObjectStreamClass.java:2301) ~[?:1.8.0_265]
>> at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass
>> .java:1431) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2372) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2290) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2148) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
>> ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2366) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2290) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2148) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
>> ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2366) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2290) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2148) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
>> ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2366) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2290) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2148) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
>> ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2366) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2290) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
>> .java:2148) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1647)
>> ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
>> .java:2366) ~[?:1.8.0_265]
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
>> 2290) ~[?:1.8.0_265]
>

Re: Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
I just switched from providing my jar while creating a Remote environment
to providing this jar on flink's classpath. It used to work just fine when
the jar was shipped to Fllink with the job graph. Now when jar is available
to flink on the startup the same job that used to run is failing with
exception I provided. I suspect that it might be class loader issue but am
not sure

On Fri, Nov 20, 2020 at 12:10 PM Arvid Heise  wrote:

> The easiest solution for all non-trivial issues like this is to start the
> application locally in a local executor, so you can debug in your IDE.
>
> Additionally, double-check that you have no lambdas/anonymous classes that
> reference outer classes with ObjectMapper. ObjectMapper should also be
> static as it's fully immutable, so you can also check that.
>
> On Fri, Nov 20, 2020 at 8:55 PM Alexander Bagerman 
> wrote:
>
>> Thanks, Arvid,
>> That is what I thought too. I went through all the instances where it
>> might 've been a member variable and made sure that it's declared as
>> transient :-( Is there anything else I can check?
>> Alex
>>
>> On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise  wrote:
>>
>>> Are you using ObjectMapper as a non-transient field? If so, please make
>>> it transient and initialize in open() of a Rich*Function.
>>>
>>> On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman 
>>> wrote:
>>>
 Hi,
 I added my custom jar (that includes dependencies on Jackson) to Flink
 classpath. It seems to be loaded just fine. But when the job starts I am
 getting an exception below. I am sure how to interpret the exception though
 and would appreciate it if somebody gives me advice on it.
 Thanks
 Alex

 2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task
 [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1/
 1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED.
 org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
 instantiate user function.
 at org.apache.flink.streaming.api.graph.StreamConfig
 .getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-1.11
 .2.jar:1.11.2]
 at org.apache.flink.streaming.runtime.tasks.OperatorChain
 .createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.2
 .jar:1.11.2]
 at org.apache.flink.streaming.runtime.tasks.OperatorChain
 .createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.2
 .jar:1.11.2]
 at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
 OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at org.apache.flink.streaming.runtime.tasks.StreamTask
 .beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
 StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
 [flink-dist_2.11-1.11.2.jar:1.11.2]
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
 [flink-dist_2.11-1.11.2.jar:1.11.2]
 at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
 Caused by: java.lang.ClassCastException: cannot assign instance of
 java.util.concurrent.ConcurrentHashMap to field
 com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers
 of type com.fasterxml.jackson.databind.util.LRUMap in instance of
 com.fasterxml.jackson.databind.deser.DeserializerCache
 at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
 ObjectStreamClass.java:2301) ~[?:1.8.0_265]
 at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass
 .java:1431) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
 .java:2372) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
 2290) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
 .java:2148) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
 1647) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
 .java:2366) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
 2290) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
 .java:2148) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
 1647) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
 .java:2366) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:
 2290) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
 .java:2148) ~[?:1.8.0_265]
 at java.io.ObjectInputStream.readObject0(ObjectInputStrea

Hi I'm having problems with self-signed certificiate trust with Native K8S

2020-11-20 Thread Kevin Kwon
Hi I am using MinIO as a S3 mock backend for Native K8S

Everything seems to be fine except that it cannot connect to S3 since
self-signed certificates' trusted store are not cloned in Deployment
resources

Below is in order, how I add the trusted keystore by using keytools and how
I run my app with the built image

FROM registry.local/mde/my-flink-app:0.0.1
COPY s3/certs/public.crt $FLINK_HOME/s3-e2e-public.crt
RUN keytool \
  -noprompt \
  -alias s3-e2e-public \
  -importcert \
  -trustcacerts \
  -keystore $JAVA_HOME/lib/security/cacerts \
  -storepass changeit \
  -file $FLINK_HOME/s3-e2e-public.crt

$FLINK_HOME/bin/flink run-application \
  -t kubernetes-application \
-Denv.java.opts="-Dkafka.brokers=kafka-external:9092
-Dkafka.schema-registry.url=kafka-schemaregistry:8081" \
-Dkubernetes.container-start-command-template="%java% %classpath%
%jvmmem% %jvmopts% %logging% %class% %args%" \
-Dkubernetes.cluster-id=${K8S_CLUSTERID} \

-Dkubernetes.container.image=${DOCKER_REPO}/${ORGANISATION}/${APP_NAME}:${APP_VERSION}
\
-Dkubernetes.namespace=${K8S_NAMESPACE} \
-Dkubernetes.rest-service.exposed.type=${K8S_RESTSERVICE_EXPOSED_TYPE} \
-Dkubernetes.taskmanager.cpu=${K8S_TASKMANAGER_CPU} \
-Dresourcemanager.taskmanager-timeout=360 \
-Dtaskmanager.memory.process.size=${TASKMANAGER_MEMORY_PROCESS_SIZE} \
-Dtaskmanager.numberOfTaskSlots=${TASKMANAGER_NUMBEROFTASKSLOTS} \
-Ds3.endpoint=s3:443 \
-Ds3.access-key=${S3_ACCESSKEY} \
-Ds3.secret-key=${S3_SECRETKEY} \
-Ds3.path.style.access=true \
-Dstate.backend=filesystem \
-Dstate.checkpoints.dir=s3://${ORGANISATION}/${APP_NAME}/checkpoint \
-Dstate.savepoints.dir=s3://${ORGANISATION}/${APP_NAME}/savepoint \
local://${FLINK_HOME}/usrlib/${APP_NAME}-assembly-${APP_VERSION}.jar

However, I get the following error and I don't see my trusted key in
keytools when I login to the pod (seems the trustedstore is not
cloned)

Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to
create checkpoint storage at checkpoint coordinator side.
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:305)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:224)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:483)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:338)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:269)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:242)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.SchedulerBase.(SchedulerBase.java:229)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.(DefaultScheduler.java:119)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:103)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:284)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:272)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.(JobManagerRunnerImpl.java:140)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:388)
~[flink-dist_2.12-1.11.2.jar:1.11.2]
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
~[?:1.8.0_265]
... 6 more
Caused by: org.apache.hadoop.fs.s3a.AWSClientIOException:
doesBucketExist on mde: com.amazonaws.SdkClientException: Unable to
execute HTTP request: Unrecognized SSL message, plaintext connection?:
Unable to execute HTTP request: Unrecognized SSL message, plaintext
connection?


Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
Oh no, please never put user code (with included libraries) into flink's
classpath. It's not supported exactly for classloader reasons. Why did you
think that this would be a good approach? Is your jar too big?

Maybe a different deployment mode would be more appropriate? [1]

Alternatively, if you want to go the hacky route, you could also try to
shade your dependencies.

[1] https://flink.apache.org/news/2020/07/14/application-mode.html

On Fri, Nov 20, 2020 at 9:18 PM Alexander Bagerman 
wrote:

> I just switched from providing my jar while creating a Remote environment
> to providing this jar on flink's classpath. It used to work just fine when
> the jar was shipped to Fllink with the job graph. Now when jar is available
> to flink on the startup the same job that used to run is failing with
> exception I provided. I suspect that it might be class loader issue but am
> not sure
>
> On Fri, Nov 20, 2020 at 12:10 PM Arvid Heise  wrote:
>
>> The easiest solution for all non-trivial issues like this is to start the
>> application locally in a local executor, so you can debug in your IDE.
>>
>> Additionally, double-check that you have no lambdas/anonymous classes
>> that reference outer classes with ObjectMapper. ObjectMapper should also be
>> static as it's fully immutable, so you can also check that.
>>
>> On Fri, Nov 20, 2020 at 8:55 PM Alexander Bagerman 
>> wrote:
>>
>>> Thanks, Arvid,
>>> That is what I thought too. I went through all the instances where it
>>> might 've been a member variable and made sure that it's declared as
>>> transient :-( Is there anything else I can check?
>>> Alex
>>>
>>> On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise 
>>> wrote:
>>>
 Are you using ObjectMapper as a non-transient field? If so, please make
 it transient and initialize in open() of a Rich*Function.

 On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman 
 wrote:

> Hi,
> I added my custom jar (that includes dependencies on Jackson) to Flink
> classpath. It seems to be loaded just fine. But when the job starts I am
> getting an exception below. I am sure how to interpret the exception 
> though
> and would appreciate it if somebody gives me advice on it.
> Thanks
> Alex
>
> 2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.Task
> [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink: Unnamed (1
> /1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from RUNNING to FAILED
> .
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at org.apache.flink.streaming.api.graph.StreamConfig
> .getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-
> 1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11.
> 2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain
> .createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11.
> 2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
> OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.streaming.runtime.tasks.StreamTask
> .beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.2
> ]
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> [flink-dist_2.11-1.11.2.jar:1.11.2]
> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
> Caused by: java.lang.ClassCastException: cannot assign instance of
> java.util.concurrent.ConcurrentHashMap to field
> com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers
> of type com.fasterxml.jackson.databind.util.LRUMap in instance of
> com.fasterxml.jackson.databind.deser.DeserializerCache
> at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
> ObjectStreamClass.java:2301) ~[?:1.8.0_265]
> at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass
> .java:1431) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
> .java:2372) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream
> .java:2290) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream
> .java:2148) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:
> 1647) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream
> .java:2366) ~[?:1.8.0_265]
> at java.io.ObjectInputStream.readSerialData(ObjectInputStream

Re: Jackson de/serialization exception?

2020-11-20 Thread Alexander Bagerman
A couple of reasons I 've done that
- it's listed as an option here :
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#advanced-customization
under optional libraries
- I have over 200 jobs running that rely on the same core functionality
provided by the jar in question and it seems somewhat wasteful to do 200
uploads of 80M+ of the same code to the cluster
- I was having issues shipping jar content when it was packaged as a part
of my "fat" jar application. I was providing a URL to the jar
(jar:file:!/xxx) but flink could not find it there.


On Fri, Nov 20, 2020 at 12:29 PM Arvid Heise  wrote:

> Oh no, please never put user code (with included libraries) into flink's
> classpath. It's not supported exactly for classloader reasons. Why did you
> think that this would be a good approach? Is your jar too big?
>
> Maybe a different deployment mode would be more appropriate? [1]
>
> Alternatively, if you want to go the hacky route, you could also try to
> shade your dependencies.
>
> [1] https://flink.apache.org/news/2020/07/14/application-mode.html
>
> On Fri, Nov 20, 2020 at 9:18 PM Alexander Bagerman 
> wrote:
>
>> I just switched from providing my jar while creating a Remote environment
>> to providing this jar on flink's classpath. It used to work just fine when
>> the jar was shipped to Fllink with the job graph. Now when jar is available
>> to flink on the startup the same job that used to run is failing with
>> exception I provided. I suspect that it might be class loader issue but am
>> not sure
>>
>> On Fri, Nov 20, 2020 at 12:10 PM Arvid Heise  wrote:
>>
>>> The easiest solution for all non-trivial issues like this is to start
>>> the application locally in a local executor, so you can debug in your IDE.
>>>
>>> Additionally, double-check that you have no lambdas/anonymous classes
>>> that reference outer classes with ObjectMapper. ObjectMapper should also be
>>> static as it's fully immutable, so you can also check that.
>>>
>>> On Fri, Nov 20, 2020 at 8:55 PM Alexander Bagerman 
>>> wrote:
>>>
 Thanks, Arvid,
 That is what I thought too. I went through all the instances where it
 might 've been a member variable and made sure that it's declared as
 transient :-( Is there anything else I can check?
 Alex

 On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise 
 wrote:

> Are you using ObjectMapper as a non-transient field? If so, please
> make it transient and initialize in open() of a Rich*Function.
>
> On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman 
> wrote:
>
>> Hi,
>> I added my custom jar (that includes dependencies on Jackson) to
>> Flink classpath. It seems to be loaded just fine. But when the job 
>> starts I
>> am getting an exception below. I am sure how to interpret the exception
>> though and would appreciate it if somebody gives me advice on it.
>> Thanks
>> Alex
>>
>> 2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.
>> Task [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink:
>> Unnamed (1/1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from
>> RUNNING to FAILED.
>> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
>> instantiate user function.
>> at org.apache.flink.streaming.api.graph.StreamConfig
>> .getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-
>> 1.11.2.jar:1.11.2]
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-1.11
>> .2.jar:1.11.2]
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain
>> .createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-1.11
>> .2.jar:1.11.2]
>> at org.apache.flink.streaming.runtime.tasks.OperatorChain.(
>> OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11.
>> 2]
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:528) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
>> [flink-dist_2.11-1.11.2.jar:1.11.2]
>> at java.lang.Thread.run(Thread.java:748) [?:1.8.0_265]
>> Caused by: java.lang.ClassCastException: cannot assign instance of
>> java.util.concurrent.ConcurrentHashMap to field
>> com.fasterxml.jackson.databind.deser.DeserializerCache._cachedDeserializers
>> of type com.fasterxml.jackson.databind.util.LRUMap in instance of
>> com.fasterxml.jackson.databind.deser.DeserializerCache
>> at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(
>> ObjectStreamCla

Re: Logs of JobExecutionListener

2020-11-20 Thread Flavio Pompermaier
I think that the problem is that my REST service submits the job to
the Flink standalone cluster and responds to the client with the
submitted job ID.
To achieve this, I was using the
RestClusterClient because with that I can use the
following code and retrieve the JobID:

(1) JobID flinkJobId =
client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID();

Unfortunately this does not activate the job listener (that is quite
surprising to me...I thought that such a listener was triggered by the
JobManager).
So, after Aljoscha answer I take a deeper look into the Flink CLI code
and what it does is basically this:

(2) ClientUtils.executeProgram(new DefaultExecutorServiceLoader(),
flinkConf, packagedProgram, false, false);

That works as expected (I wasn't aware of the ThreadLocal mechanism
used by the ContextEnvironment and StreamContextEnvironment: a very
advanced programming technique) but it does not allow to get back the
job id that I need..I can live with that because I can save the Flink
Job ID in an external service when the job listener triggers the
onJobSubmitted method but I think this mechanism is quite weird..

So my question is: is there a simple way to achieve my goal? Am I
doing something wrong?
At the moment I had to implement a job-status polling thread after the
line (1) but this looks like a  workaround to me..

Best,
Flavio

On Thu, Nov 19, 2020 at 4:28 PM Flavio Pompermaier  wrote:
>
> You're right..I removed my flink dir and I re-extracted it and now it
> works. Unfortunately I didn't keep the old version to understand what
> were the difference but the error was probably caused by the fact that
> I had a previous version of the WordCount.jar (without the listener)
> in the flink lib dir.. (in another dev session I was experimenting in
> running the job having the user jar in the lib dir). Sorry for the
> confusion.
> Just one last question: is the listener executed on the client or on
> the job server? This is not entirely clear to me..
>
> Best,
> Flavio
>
> On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin  wrote:
> >
> > I also tried 1.11.0 and 1.11.2, both work for me.
> >
> > On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek  
> > wrote:
> >>
> >> Hmm, there was this issue:
> >> https://issues.apache.org/jira/browse/FLINK-17744 But it should be fixed
> >> in your version.
> >>
> >> On 19.11.20 12:58, Flavio Pompermaier wrote:
> >> > Which version are you using?
> >> > I used the exact same commands on Flink 1.11.0 and I didn't get the job
> >> > listener output..
> >> >
> >> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin  ha 
> >> > scritto:
> >> >
> >> >> Hi Flavio and Aljoscha,
> >> >>
> >> >> Sorry for the late heads up. I could not actually reproduce the reported
> >> >> problem with 'flink run' and local standalone cluster on master.
> >> >> I get the expected output with the suggested modification of WordCount
> >> >> program:
> >> >>
> >> >> $ bin/start-cluster.sh
> >> >>
> >> >> $ rm -rf out; bin/flink run
> >> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar --output
> >> >> flink/build-target/out
> >> >>
> >> >> Executing WordCount example with default input data set.
> >> >> Use --input to specify file input.
> >> >>  SUBMITTED
> >> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
> >> >> Program execution finished
> >> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
> >> >> Job Runtime: 139 ms
> >> >>
> >> >>  EXECUTED
> >> >>
> >> >> Best,
> >> >> Andrey
> >> >>
> >> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek 
> >> >> wrote:
> >> >>
> >> >>> JobListener.onJobExecuted() is only invoked in
> >> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute(). If 
> >> >>> none
> >> >>> of these is still in the call chain with that setup then the listener
> >> >>> will not be invoked.
> >> >>>
> >> >>> Also, this would only happen on the client, not on the broker (in your
> >> >>> case) or the server (JobManager).
> >> >>>
> >> >>> Does that help to debug the problem?
> >> >>>
> >> >>> Aljoscha
> >> >>>
> >> >>> On 19.11.20 09:49, Flavio Pompermaier wrote:
> >>  I have a spring boot job server that act as a broker towards our
> >>  application and a Flink session cluster. To submit a job I use the
> >>  FlinkRestClient (that is also the one used in the CLI client when I 
> >>  use
> >> >>> the
> >>  run action it if I'm not wrong). However both methods don't trigger 
> >>  the
> >> >>> job
> >>  listener.
> >> 
> >>  Il gio 19 nov 2020, 09:39 Aljoscha Krettek  ha
> >> >>> scritto:
> >> 
> >> > @Flavio, when you're saying you're using the RestClusterClient, you 
> >> > are
> >> > not actually using that manually, right? You're just submitting your
> >> >>> job
> >> > via "bin/flink run ...", right?
> >> >
> >> > What's the exact invocation of "bin/flink run" that you're using?
> >> >

Re: Jackson de/serialization exception?

2020-11-20 Thread Arvid Heise
Hm yes that are good reasons. The issue is that if you put it into Flink,
then it's part of the system classloader of Flink, so there is no way to
unload classes or protect Flink's classes (+its dependencies) from being
overwritten by your dependencies. I'm thinking that this may cause
differences in the serialization and deserialization of task configuration.

If you want to go hardcore you could list all class files in your jar and
in the remaining libs and check for intersections. These intersection
should be
- excluded from your jar, or
- must be the exact same version (but then you could just include it to
reduce size), or
- you need to shade them within your jar.

On Fri, Nov 20, 2020 at 9:36 PM Alexander Bagerman 
wrote:

> A couple of reasons I 've done that
> - it's listed as an option here :
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/docker.html#advanced-customization
> under optional libraries
> - I have over 200 jobs running that rely on the same core functionality
> provided by the jar in question and it seems somewhat wasteful to do 200
> uploads of 80M+ of the same code to the cluster
> - I was having issues shipping jar content when it was packaged as a part
> of my "fat" jar application. I was providing a URL to the jar
> (jar:file:!/xxx) but flink could not find it there.
>
>
> On Fri, Nov 20, 2020 at 12:29 PM Arvid Heise  wrote:
>
>> Oh no, please never put user code (with included libraries) into flink's
>> classpath. It's not supported exactly for classloader reasons. Why did you
>> think that this would be a good approach? Is your jar too big?
>>
>> Maybe a different deployment mode would be more appropriate? [1]
>>
>> Alternatively, if you want to go the hacky route, you could also try to
>> shade your dependencies.
>>
>> [1] https://flink.apache.org/news/2020/07/14/application-mode.html
>>
>> On Fri, Nov 20, 2020 at 9:18 PM Alexander Bagerman 
>> wrote:
>>
>>> I just switched from providing my jar while creating a Remote
>>> environment to providing this jar on flink's classpath. It used to work
>>> just fine when the jar was shipped to Fllink with the job graph. Now when
>>> jar is available to flink on the startup the same job that used to run is
>>> failing with exception I provided. I suspect that it might be class loader
>>> issue but am not sure
>>>
>>> On Fri, Nov 20, 2020 at 12:10 PM Arvid Heise 
>>> wrote:
>>>
 The easiest solution for all non-trivial issues like this is to start
 the application locally in a local executor, so you can debug in your IDE.

 Additionally, double-check that you have no lambdas/anonymous classes
 that reference outer classes with ObjectMapper. ObjectMapper should also be
 static as it's fully immutable, so you can also check that.

 On Fri, Nov 20, 2020 at 8:55 PM Alexander Bagerman 
 wrote:

> Thanks, Arvid,
> That is what I thought too. I went through all the instances where it
> might 've been a member variable and made sure that it's declared as
> transient :-( Is there anything else I can check?
> Alex
>
> On Fri, Nov 20, 2020 at 11:50 AM Arvid Heise 
> wrote:
>
>> Are you using ObjectMapper as a non-transient field? If so, please
>> make it transient and initialize in open() of a Rich*Function.
>>
>> On Fri, Nov 20, 2020 at 7:56 PM Alexander Bagerman <
>> bager...@gmail.com> wrote:
>>
>>> Hi,
>>> I added my custom jar (that includes dependencies on Jackson) to
>>> Flink classpath. It seems to be loaded just fine. But when the job 
>>> starts I
>>> am getting an exception below. I am sure how to interpret the exception
>>> though and would appreciate it if somebody gives me advice on it.
>>> Thanks
>>> Alex
>>>
>>> 2020-11-20 18:34:35,643 WARN org.apache.flink.runtime.taskmanager.
>>> Task [] - Source: Custom Source -> Flat Map -> Flat Map -> Sink:
>>> Unnamed (1/1) (dcbf799dadba5d4b7e7f5af15919a4b6) switched from
>>> RUNNING to FAILED.
>>> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
>>> instantiate user function.
>>> at org.apache.flink.streaming.api.graph.StreamConfig
>>> .getStreamOperatorFactory(StreamConfig.java:275) ~[flink-dist_2.11-
>>> 1.11.2.jar:1.11.2]
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createChainedOperator(OperatorChain.java:471) ~[flink-dist_2.11-
>>> 1.11.2.jar:1.11.2]
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .createOutputCollector(OperatorChain.java:393) ~[flink-dist_2.11-
>>> 1.11.2.jar:1.11.2]
>>> at org.apache.flink.streaming.runtime.tasks.OperatorChain
>>> .(OperatorChain.java:155) ~[flink-dist_2.11-1.11.2.jar:1.11.2]
>>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>>> .beforeInvoke(StreamTask.java:459) ~[flink-dist_2.11-1.11.2.jar:1.11
>>> .2]
>>> at org.a

Concise example of how to deploy flink on Kubernetes

2020-11-20 Thread George Costea
Hi there,

Is there an example of how to deploy a flink cluster on Kubernetes?
I'd like to deploy the flink cluster, a kafka-broker, and then the
greeter example to give it a try.

Thanks,
George


Re: Force Join Unique Key

2020-11-20 Thread Rex Fenley
I have a few more questions.

Even if a join has no unique keys, couldn't the join key be used to
organize records into a tree, of groups of records, per join key so that
lookups are faster?

I also have been looking at RocksDB docs and it looks like it has a
RangeScan operation. I'm guessing then join keys could also be hashed in
such a way to enable faster lookup by RangeScan. I also noticed mention of
Prefix Iterators, which might actually do what I'm suggesting.

Have either of these been considered?

Thanks!

On Thu, Nov 19, 2020 at 6:51 PM Rex Fenley  wrote:

> I'm reading your response as rocksdb having to seek across the whole
> dataset for the whole table, which we hope to avoid.
>
> What are the rules for the unique key and unique join key inference? Maybe
> we can reorganize our plan to allow it to infer unique keys more correctly.
>
> Thanks
>
> On Wed, Nov 18, 2020 at 9:50 PM Jark Wu  wrote:
>
>> Yes, exactly. The rocksdb has to "seek" data sets because it doesn't know
>> how many entries are under the join key.
>>
>> On Thu, 19 Nov 2020 at 13:38, Rex Fenley  wrote:
>>
>>> Ok, but if there is only 1 row per Join key on either side of the join,
>>> then wouldn't "iterate all the values in the MapState under the current
>>> key" effectively be "iterate 1 value in MapState under the current key"
>>> which would be O(1)? Or are you saying that it must seek across the entire
>>> dataset for the whole table even for that 1 row on either side of the join?
>>>
>>> Thanks for the help so far!
>>>
>>> On Wed, Nov 18, 2020 at 6:30 PM Jark Wu  wrote:
>>>
 Actually, if there is no unique key, it's not O(1), because there maybe
 multiple rows are joined by the join key, i.e. iterate all the values in
 the MapState under the current key, this is a "seek" operation on rocksdb
 which is not efficient.

 Are you asking where the join key is set? The join key is set by the
 framework via `AbstractStreamOperator#setKeyContextElement1`.

 Best,
 Jark

 On Thu, 19 Nov 2020 at 03:18, Rex Fenley  wrote:

> Thanks for the info.
>
> So even if there is no unique key inferred for a Row, the set of rows
> to join on each Join key should effectively still be an O(1) lookup if the
> join key is unique right?
>
> Also, I've been digging around the code to find where the lookup of
> rows for a join key happens and haven't come across anything. Mind 
> pointing
> me in the right direction?
>
> Thanks!
>
> cc Brad
>
> On Wed, Nov 18, 2020 at 7:39 AM Jark Wu  wrote:
>
>> Hi Rex,
>>
>> Currently, the join operator may use 3 kinds of state structure
>> depending on the input key and join key information.
>>
>> 1) input doesn't have a unique key => MapState,
>> where the map key is the input row and the map value is the number
>> of equal rows.
>>
>> 2) input has unique key, but the unique key is not a subset of join
>> key => MapState
>> this is better than the above one, because it has a shorter map key
>> and
>> is more efficient when retracting records.
>>
>> 3) input has a unique key, and the unique key is a subset of join key
>> => ValueState
>> this is the best performance, because it only performs a "get"
>> operation rather than "seek" on rocksdb
>>  for each record of the other input side.
>>
>> Note: the join key is the key of the keyed states.
>>
>> You can see the implementation differences
>> in 
>> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.
>>
>> Best,
>> Jark
>>
>> On Wed, 18 Nov 2020 at 02:30, Rex Fenley  wrote:
>>
>>> Ok, what are the performance consequences then of having a join with
>>> NoUniqueKey if the left side's key actually is unique in practice?
>>>
>>> Thanks!
>>>
>>>
>>> On Tue, Nov 17, 2020 at 7:35 AM Jark Wu  wrote:
>>>
 Hi Rex,

 Currently, the unique key is inferred by the optimizer. However,
 the inference is not perfect.
 There are known issues that the unique key is not derived
 correctly, e.g. FLINK-20036 (is this opened by you?). If you think you 
 have
 the same case, please open an issue.

 Query hint is a nice way for this, but it is not supported yet.
 We have an issue to track supporting query hint, see FLINK-17173.

 Beest,
 Jark


 On Tue, 17 Nov 2020 at 15:23, Rex Fenley  wrote:

> Hello,
>
> I have quite a few joins in my plan that have
>
> leftInputSpec=[NoUniqueKey]
>
> in Flink UI. I know this can't truly be the case that there is no
> unique key, at least for some of these joins that I've evaluated.
>
> Is there a way to hint to the join what the u

Re: Concise example of how to deploy flink on Kubernetes

2020-11-20 Thread Xingbo Huang
Hi George,
Have you referred to the official document[1]?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html

Best,
Xingbo

在 2020年11月21日星期六,George Costea  写道:
> Hi there,
>
> Is there an example of how to deploy a flink cluster on Kubernetes?
> I'd like to deploy the flink cluster, a kafka-broker, and then the
> greeter example to give it a try.
>
> Thanks,
> George
>


Re: Is possible that make two operators always locate in same taskmanager?

2020-11-20 Thread Si-li Liu
Thanks for your reply!

Yes, I want to A_i and T_i run in the same slot. Ideally, T operator should
have 1 parallism in topo, also all A_i can start from the same timestamp,
but some minor difference of resume timestamp in different A_i source is
also acceptable. So I think multiple T operator is also ok to me here. But
the prerequisite of this topo can work is I can make sure T and A always
reside same TM.

The problem here both stream A and stream B is very huge. 200k ~ 300k
messages per seconds in both stream, with 1k bytes ~ 2k bytes (after
compressed) per messages, and I have to keep the whole message in cache. So
it's hard to fit into Flink state.



Arvid Heise  于2020年11月21日周六 上午3:35写道:

> Your topology is definitively interesting and makes sense to me on a high
> level. The main question remaining is the parallelism. I'm assuming you run
> your pipeline with parallelism p and both source A and timestampcalculator
> T are run with parallelism p. You want to create a situation where for A_i,
> there is an T_i which run in the same slot. Am I right?
>
> If so, then as you have noticed that there is currently no way to express
> that in Flink on a high level. One more idea before trying to solve it in a
> hacky way: How large is B? Could use a broadcast to avoid the shuffle on A?
> I'm thinking of creating a pipeline A->J(side input B)->T, because then
> it's easy to produce an operator chain, where everything even runs within
> the same thread.
>
> On Fri, Nov 20, 2020 at 4:02 PM Si-li Liu  wrote:
>
>> Thanks for your reply.
>>
>> I want to join two stream A and stream B. Items in stream A come in first
>> then I keep them in memory cache, as join key and item, then serval
>> minutes later the items in stream B come in then the join work is
>> performed. The timestamp of the latest expired item in memory cache is the
>> safe rollback timestamp, I can resume source A from that timestamp when I
>> restart.
>>
>> It's not very percise, maybe lost same items or send same items twice,
>> but seems useful to me in my situation. But if job restart, both source A
>> and source B resume from last consumed offset, it will make the absense of
>> serval minutes join result, which is unacceptable.
>>
>> The topo I consider is like
>>
>> source A -> parser --shuffle--> join -> sink
>> source B -> parser ...(parallel)  |--->timestampcalculator
>>
>> Memory cache aside in join operator, the join operator will broadcast the
>> timestamp of latest expired cache item to the timestampcalculator. Then
>> timestampcalculator will use them to calculate a safe rollback timestamp (a
>> moving minimum) that source A can resume from that timestamp, source B will
>> also restart from that timestamp. I will add a bloomfilter in sink's state
>> to avoid duplicate items.
>>
>> So I want to let timestampcalculator operator and source A are located in
>> one TM, then I can send this timestamp from timestampcalculator to source A
>> by static variable.
>>
>> Hope I make my problem clear with my poor English, it seems a little
>> tricky. But I think it's the only way to do two streams join and avoid to
>> store very huge state.
>>
>>
>>
>> Arvid Heise  于2020年11月20日周五 下午2:58写道:
>>
>>> I still haven't fully understood. Do you mean you can't infer the
>>> timestamp in source A because it depends on some internal field of source B?
>>>
>>> How is that actually working in a parallel setting? Which timestamp is
>>> used in the different instances of a source?
>>>
>>> Say, we have task A1 which is the first subtask of source A and task B2
>>> as the second subtask of source B. How would you like them to be located?
>>> How does that correlate to the third subtask of the join (let's call it J3).
>>>
>>> Remember that through the shuffling before the join there is no clear
>>> correlation between any subtask of A or B to J...
>>>
>>> On Fri, Nov 20, 2020 at 3:58 AM Si-li Liu  wrote:
>>>
 Thanks for your help!

 Now the timestamps already go with the items in streaming. My streaming
 pipeline is like this:

 source -> parser --shuffle--> join -> sink

 Streaming A and streaming B go through this pipeline, I keep logs in
 streaming A in memory cache (linkedHashmap) in join operator, then all logs
 in streaming B tries to lookup up the cache and perform the actual join
 work.

 I try to use the timestamp of the lastest expire item in memory as a
 safe rollback timestamp, if I restart job, the source should use this
 timestamp as start offset. The safe rollback timestamp is calucated in join
 operator, but I want to use it in source. So the simplest way to pass this
 information from join operator to source is use static variable, which
 require source operator and join operator always locate in same TM process.

 Arvid Heise  于2020年11月20日周五 上午3:33写道:

> Hi Si-li,
>
> couldn't you also add the timestamp as a state to the source? So the
> time wou

Filesystem as a stream source in Table/SQL API

2020-11-20 Thread eef hhj
Hi,

I'm facing a situation where I want the Flink App to dynamically detect the
change of the Filesystem batch data source. As I tried in the following
example in sql-client.sh, it can query all the records under the folder for
the select.

While I'm adding a new file to the folder, the query does not refresh and
it seems it cannot detect the new file. It can only reflect records in the
new file unless I cancel the current query and do select again. Is it
possible to make the App detect such file changes automatically as the one
in the stream source?

CREATE TABLE fs_table (
  user_id STRING,
  order_amount DOUBLE,
  dt STRING,
  hh STRING,
)  WITH (
  'connector'='filesystem',
  'path'='file:///path/folder/',
  'format'='csv'
);

select * from fs_table;

-- Best wishes
Kai