question regarding flink local buffer pool

2021-01-05 Thread Eleanore Jin
Hi experts,

I am running flink 1.10, the flink job is stateless. I am trying to
understand how local buffer pool works:

1. lets say taskA and taskB both run in the same TM JVM, each task will
have its own local buffer pool, and taskA will write to pool-A, and taskB
will read from pool-A and write to pool-b, if taskB consume slower from
pool-A than taskA writes to it, it will cause backpressure.

2. If the above assumption is correct, then this works when taskA and taskB
is not chained together, if chained, there is no buffer in between, the
StreamRecord will be directly passed from taskA to taskB?

3. what is the configuration parameter for this local buffer pool? and what
is the relationship between local buffer pool with network buffer pool?

4. is the configuration for the total local buffer per TM? and is it evenly
spread between tasks?

Thanks a lot!
Eleanore


Task scheduling of Flink

2021-01-05 Thread penguin.
Hello! Do you know how to modify the task scheduling method of Flink?

Using key.fields in 1.12

2021-01-05 Thread Aeden Jameson
I've upgraded from 1.11.1 to 1.12 in hopes of using the key.fields
feature of the Kafa SQL Connector. My current connector is configured
as ,

connector.type= 'kafka'
connector.version = 'universal'
connector.topic   = 'my-topic'
connector.properties.group.id = 'my-consumer-group'
connector.properties.bootstrap.servers = '...'
format.type = 'avro'
format.avro-schema = ''

I tried adding

key.fields = 'my_key_field'

as well as

key.format = 'avro'
key.fields = 'my_key_field'

but I get the exception

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The matching candidates:
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
Unsupported property keys:
key.fields
key.format

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
at 
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:434)
at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:195)
at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
... 21 more

I have validated that the uber jar clearly contains the 1.12
dependencies. What is that magic combination of properties to get
key.fields to work? Or is it not supported with avro?

--
Thank You,
Aeden


Re: "flink list" fails when zk-based HA is enabled in flink-conf.yaml

2021-01-05 Thread Yang Wang
Hi Dongwon,

I think the root cause is that GenericCLI do not override the
"high-availability.cluster-id" with specified application id.
The GenericCLI is activated by "--target yarn-per-job". In
the FlinkYarnSessionCli, we have done this. And the following
command should work with/without ZooKeeper HA configured.


*./bin/flink list -m yarn-cluster -yid $applicationId*

You could also specify the "high-availability.cluster-id" so that leader
retrieval could get the correct JobManager address.


*flink list --target yarn-per-job -Dyarn.application.id
=$application_id
-Dhigh-availability.cluster-id=$application_id*

BTW, this is not a new introduced behavior change in Flink 1.12. I believe
it also could not work in 1.11 and 1.10.


Best,
Yang


Dongwon Kim  于2021年1月5日周二 下午11:22写道:

> Hi,
>
> I'm using Flink-1.12.0 and running on Hadoop YARN.
>
> After setting HA-related properties in flink-conf.yaml,
>
> high-availability: zookeeper
>
> high-availability.zookeeper.path.root: /recovery
>
> high-availability.zookeeper.quorum: nm01:2181,nm02:2181,nm03:2181
>
> high-availability.storageDir: hdfs:///flink/recovery
>
> the following command hangs and fails:
>
> $ flink list --target yarn-per-job -Dyarn.application.id=$application_id
>
> Before setting the properties, I can see the following lines after
> executing the above command:
>
> 2021-01-06 00:11:48,961 INFO  
> org.apache.flink.runtime.security.modules.HadoopModule
>   [] - Hadoop user set to deploy (auth:SIMPLE)
>
> 2021-01-06 00:11:48,968 INFO  
> org.apache.flink.runtime.security.modules.JaasModule
> [] - Jaas file will be created as
> /tmp/jaas-8522045433029410483.conf.
>
> 2021-01-06 00:11:48,976 INFO  org.apache.flink.client.cli.CliFrontend
>   [] - Running 'list' command.
>
> 2021-01-06 00:11:49,316 INFO  org.apache.hadoop.yarn.client.AHSProxy
>   [] - Connecting to Application History server at nm02/
> 10.93.0.91:10200
>
> 2021-01-06 00:11:49,324 INFO  org.apache.flink.yarn.YarnClusterDescriptor
> [] - No path for the flink jar passed. Using the location
> of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>
> 2021-01-06 00:11:49,333 WARN  org.apache.flink.yarn.YarnClusterDescriptor
> [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR
> environment variable is set.The Flink YARN Client needs one of these to be
> set to properly load the Hadoop configuration for accessing YARN.
>
> 2021-01-06 00:11:49,404 INFO  org.apache.flink.yarn.YarnClusterDescriptor
> [] - Found Web Interface dn03:37098 of application
> 'application_1600163418174_0127'.
>
> 2021-01-06 00:11:49,758 INFO  org.apache.flink.client.cli.CliFrontend
>   [] - Waiting for response...
>
> Waiting for response...
>
> 2021-01-06 00:11:49,863 INFO  org.apache.flink.client.cli.CliFrontend
>   [] - Successfully retrieved list of jobs
>
> -- Running/Restarting Jobs ---
>
> 31.12.2020 01:22:34 : 76fc265c44ef44ae343ab15868155de6 : stream calculator
> (RUNNING)
>
> --
>
> No scheduled jobs.
>
> After:
>
> 2021-01-06 00:06:38,971 INFO  
> org.apache.flink.runtime.security.modules.HadoopModule
>   [] - Hadoop user set to deploy (auth:SIMPLE)
>
> 2021-01-06 00:06:38,976 INFO  
> org.apache.flink.runtime.security.modules.JaasModule
> [] - Jaas file will be created as
> /tmp/jaas-3613274701724362777.conf.
>
> 2021-01-06 00:06:38,982 INFO  org.apache.flink.client.cli.CliFrontend
>   [] - Running 'list' command.
>
> 2021-01-06 00:06:39,304 INFO  org.apache.hadoop.yarn.client.AHSProxy
>   [] - Connecting to Application History server at nm02/
> 10.93.0.91:10200
>
> 2021-01-06 00:06:39,312 INFO  org.apache.flink.yarn.YarnClusterDescriptor
> [] - No path for the flink jar passed. Using the location
> of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
>
> 2021-01-06 00:06:39,320 WARN  org.apache.flink.yarn.YarnClusterDescriptor
> [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR
> environment variable is set.The Flink YARN Client needs one of these to be
> set to properly load the Hadoop configuration for accessing YARN.
>
> 2021-01-06 00:06:39,388 INFO  org.apache.flink.yarn.YarnClusterDescriptor
> [] - Found Web Interface dn03:37098 of application
> 'application_1600163418174_0127'.
>
> 2021-01-06 00:06:39,399 INFO  org.apache.flink.runtime.util.ZooKeeperUtils
> [] - Enforcing default ACL for ZK connections
>
> 2021-01-06 00:06:39,399 INFO  org.apache.flink.runtime.util.ZooKeeperUtils
> [] - Using '/recovery/default' as Zookeeper namespace.
>
> 2021-01-06 00:06:39,425 INFO  
> org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility
> [] - Running in ZooKeeper 3.4.x compatibility

Re: Job manager high availability job restarting

2021-01-05 Thread Yang Wang
I think it is the expected behavior. When the active JobManager loses
leadership, the standby one
will try to take over and recover the job from the latest successful
checkpoint.

The high availability just helps with leader election/retrieval and HA meta
storage(e.g. job graphs, checkpoints, etc.).
It could not avoid job restarts in JobManager failures.

Best,
Yang

Giselle van Dongen  于2021年1月6日周三 上午6:23写道:

> Hi!
>
>
> We are running a high available Flink cluster in standalone mode with
> Zookeeper with 2 jobmanagers and 5 taskmanagers.
>
> When the jobmanager is killed, the standby jobmanager takes over. But the
> job is also restarted.
>
> Is this the default behavior and can we avoid job restarts (for jobmanager
> failure) in some way?
>
>
> Thank you,
>
> Giselle
>


Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Robert Cullen
Arvid,

I’m hoping to get your input on a process I’m working on. Originally I was
using a streaming solution but noticed that the data in the sliding windows
was getting too large over longer intervals and sometimes stopped
processing altogether. Anyway, the total counts should be a fixed number so
a batch process would be more acceptable.

The use case is this: Get counts on keys for 30 minutes of data, take those
totals and take a 30 second time slice on the same data, possibly
consecutive time slices, take the results and run it through one function:
Originally my code looked like this using Sliding Time Windows in streaming
mode:

 final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);

DataStream stream = env
.addSource(getConsumer(properties))
.name("Kafka Source");

DataStream> keyedCounts  = stream
.filter(value -> value.getGrokName() != null)
.map(new MapFunction>() {
@Override
public Tuple2 map(FluentdMessage
value) throws Exception {
return Tuple2.of(value.getGrokName(), 1L);
}
})
.keyBy(value -> value.f0)

.window(SlidingProcessingTimeWindows.of(Time.minutes(30),
Time.seconds(30)))
.trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
//.sum(2);
.reduce((ReduceFunction>) (data1,
data2) -> Tuple2.of(data1.f0, data1.f1 + data2.f1));

   keyedCounts

.windowAll(SlidingProcessingTimeWindows.of(Time.minutes(30),
Time.seconds(30)))
.trigger(PurgingTrigger.of(ProcessingTimeTrigger.create()))
.process(new ProcessAllWindowFunction, Tuple5, TimeWindow>() {

private ValueState currentCount;

@Override
public void open(Configuration parameters) throws
Exception {
currentCount = getRuntimeContext().getState(
new ValueStateDescriptor<>("count",
Long.class));
}

@Override
public void process(Context context,
Iterable> iterable,
Collector> out) throws Exception {
long count =
StreamSupport.stream(iterable.spliterator(), false).count();
if(currentCount.value() == null) {
currentCount.update(0L);
}
Iterator> iterator =
iterable.iterator();
Map map = new HashMap<>();
Map> keyTotalMap = new HashMap<>();

if(currentCount.value() < count) {
while (iterator.hasNext()) {
Tuple2 tuple = iterator.next();
map.put(tuple.f0,
keyDifference(tuple.f0, iterable));
keyTotalMap.computeIfAbsent(tuple.f0,
k -> new ArrayList<>()).add(tuple.f1);
//out.collect(Tuple3.of(tuple.f0,
keyDifference(tuple.f0, iterable), sum(iterable)));
}

map.forEach((key, value) -> {
if(value > 0L) {
out.collect(Tuple5.of(
key,
value,
sum(key, keyTotalMap),

getChiSqrLoggerScore(value, sumKeys(map), sum(key, keyTotalMap),
sum(keyTotalMap)),
System.currentTimeMillis()));
}});

//out.collect(Tuple5.of(null, null, null,
null, null));
currentCount.update(count);
} else {
//This is currently the only way to force
the job to end
throw new InterruptedException();
}
}
})
   .addSink(new RichChiLoggerInputSink())
   .name("Postgres Sink");

//globalCounts.writeAsText("s3://argo-workflow-bucket/output.txt");
env.execute("Flink Kafka Chi Log Runner");

This does not work in batch mode. So I need some guidance. Thanks!

On Tue, Jan 5, 2021 at 11:29 AM Arvid Heise  wrote:

> Sorry Robert for not checking the complete example. New sources are used
> with fromSource instead of addSource. It's not ideal but hopefully we can
> remove the old way rather soonish to avoid confusion.
>
> On Tue, Jan 5, 2021 at 5:23 PM Robert Cullen 
> wrote:
>
>> Arvid,
>>
>> Thank yo

Job manager high availability job restarting

2021-01-05 Thread Giselle van Dongen
Hi!


We are running a high available Flink cluster in standalone mode with Zookeeper 
with 2 jobmanagers and 5 taskmanagers.

When the jobmanager is killed, the standby jobmanager takes over. But the job 
is also restarted.

Is this the default behavior and can we avoid job restarts (for jobmanager 
failure) in some way?


Thank you,

Giselle


Re: UDTAGG and SQL

2021-01-05 Thread Marco Villalobos
Hi Timo,

Can you please elaborate a bit on what you mean? I am not sure that I
completely understand.

Thank you.

Sincerely,

Marco A. Villalobos

On Tue, Jan 5, 2021 at 6:58 AM Timo Walther  wrote:

> A subquery could work but since you want to implement a UDTAGG anyway,
> you can also move the implementation there and keep the SQL query
> simple. But this is up to you. Consecutive windows are supported.
>
> Regards,
> Timo
>
>
> On 05.01.21 15:23, Marco Villalobos wrote:
> > Hi Timo,
> >
> > Thank you for the quick response.
> >
> > Neither COLLECT nor LISTAGG work because they only accept one column.
> >
> > I am trying to collect all the rows and columns into one object. Like a
> List for example.
> > Later, I need make calculations upon all the rows that were just
> collected within a window.
> >
> > Maybe I need to use a subquery, ie,  SELECT FROM (SELECT FROM)?
> >
> >> On Jan 5, 2021, at 6:10 AM, Timo Walther  wrote:
> >>
> >> Hi Marco,
> >>
> >> nesting aggregated functions is not allowed in SQL. The exception could
> be improved though. I guess the planner searches for a scalar function
> called `MyUDTAGG` in your example and cannot find one.
> >>
> >> Maybe the built-in function `COLLECT` or `LISTAGG`is what you are
> looking for?
> >>
> >>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 05.01.21 14:45, Marco Villalobos wrote:
> >>>   I am trying to use User defined Table Aggregate function directly in
> the SQL so that I could combine all the rows collected in a window into one
> object.
> >>> GIVEN a User defined Table Aggregate function
> >>> public class MyUDTAGG extends
> TableAggregateFunction {
> >>> public PurchaseWindow createAccumulator() {
> >>> return new PurchaseWindow();
> >>> }
> >>> public void accumulate(PurchaseWindow acc, String name, double
> cost) {
> >>> acc.add(name, cost);
> >>> }
> >>> public void emitValue(PurchaseWindow acc,
> Collector out) {
> >>> out.collect(acc);
> >>> }
> >>> }
> >>> THAT it is registered as
> >>> StreamTableEnvironment tEnv = ...
> >>> tEnv.registerFunction("MyUDTAGG", new MyUDTAGG());
> >>> THEN is it possible to call it in an SQL query in this manner?
> >>> SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window
> >>> FROM purchases
> >>> GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name
> >>> I am receiving an SQL validation error, "No match found for function
> signature ...".
> >>> What am I doing wrong, or is there a better way to do this?
> >>
> >
>
>


Re: Submitting a job in non-blocking mode using curl and the REST API

2021-01-05 Thread Chesnay Schepler
I'm not aware of any plans to change this behavior. Overall the 
community is rather split on what role the web-submission should play; 
whether it should be a genuine way to submit jobs with the same 
capabilities as the CLI, just for prototyping or not exist at all.


As it stands we are somewhat hovering around the prototyping-take, so a 
more appropriate long-term solution would be to allow the CLI (which 
after all also goes through the REST API) to support 
proxies/authentication etc., but that is likely blocked on replacing our 
HTTP client with some off-the-shelf library.


One thing to note is that methods are generally not considered to be 
used for production-code, and are more for prototyping.


Ideally you can find a way to simply not rely on these methods.
How/whether this can be done depends of course on the use-case; for 
example let's say a job is executed and dependent on the result of 
count/collect a second job should be executed. An alternative approach 
might write equivalent data to some external system, and then read it 
back on the client-side to orchestrate the scheduling of other jobs.


Another hacky option might be to use the CLI, but intercepting the 
message and enriching it with additional authentication information?


On 1/5/2021 5:17 PM, Adam Roberts wrote:
Thanks Chesnay for the prompt response - ah, so my cunning plan to use 
execution.attached=true doesn't sound so reasonable now then (I was 
going to look at providing that as a programArg next).
I did find, in 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html, 
this which I assume you are referring to



/Web Submission behaves the same as detached mode./

/WithFLINK-16657 
the web submission 
logic changes and it exposes the same behavior as submitting a job 
through the CLI in detached mode. This implies that, for instance, 
jobs based on the DataSet API that were using sinks 
like|print()|,|count()|or|collect()|will now throw an exception while 
before the output was simply never printed. See also comments on 
relatedPR ./


So, here's a question - if we are advised to use a proxy to support 
alternative auth mechanisms, and those mechanisms don’t work with the 
CLI (thus forcing the use of curl)...how are we supposed to submit a 
job with print(), count() or collect() etc?
I know you've said it's not supported, but is that an "at the moment" 
kinda thing? Is this something planned or something you think I should 
create a JIRA issue for?

Thanks again, much appreciated

- Original message -
From: Chesnay Schepler 
To: Adam Roberts , user@flink.apache.org
Cc:
Subject: [EXTERNAL] Re: Submitting a job in non-blocking mode
using curl and the REST API
Date: Tue, Jan 5, 2021 4:07 PM

All jobs going through the web-submission are run in detached mode
for technical reasons (blocking of threads, and information having
to be transported back to the JobManager for things like collect()).
You unfortunately cannot run non-detached/attached/blocking jobs
via the web submission, which includes the WordCount example
because it uses specific methods (the ones mentioned in the
exception; collect, print, printToErr, count).
In other words, your setup appears to be fine correctly, you are
just trying to do something that is not supported.
On 1/5/2021 4:07 PM, Adam Roberts wrote:

Hey everyone, I've got an awesome looking Flink cluster set up
withweb.submit.enable=true, and plenty of bash for handling jar
upload and then submission to a JobManager - all good so far.
Unfortunately, when I try to submit the classic WordCount
example, I get a massive error with the jist of it being:
/"Job was submitted in detached mode. Results of job execution,
such as accumulators, runtime, etc. are not available. Please
make sure your program doesn't call an eager execution function
[collect, print, printToErr, count]."/
*So, how do I run it *not* in detached mode using curl please?*
I'm intentionally not using the Flink CLI because I am using an
nginx with auth proxy set up - so I'm doing everything with curl,
in a bash script - (so, two requests - one to upload the jar,
then I get the ID from the response, and then submit the job with
that ID).
At

https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html


 if
you ctrl-f for /run, there's nothing obvious that indicates how I
can run in blocking mode - the biggest clue I've got is
 

Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

2021-01-05 Thread Daniel Peled
Thank you for your answers.
We ended up changing the isolation level to read_uncommitted and it solves
the latency problem between the two jobs understanding that we may have
duplications in the second job when the first job fails and roll back.

בתאריך יום ג׳, 5 בינו׳ 2021 ב-15:23 מאת 赵一旦 :

> I think what you need is
> http://kafka.apache.org/documentation/#consumerconfigs_isolation.level .
>
> The isolation.level setting's default value is read_uncommitted. So,
> maybe you do not use the default setting?
>
> 赵一旦  于2021年1月5日周二 下午9:10写道:
>
>> I do not have this problem, so I guess it is related with the config of
>> your kafka producer and consumer, and maybe kafka topic properties or kafka
>> server properties also.
>>
>> Arvid Heise  于2021年1月5日周二 下午6:47写道:
>>
>>> Hi Daniel,
>>>
>>> Flink commits transactions on checkpoints while Kafka Streams/connect
>>> usually commits on record. This is the typical tradeoff between Throughput
>>> and Latency. By decreasing the checkpoint interval in Flink, you can reach
>>> comparable latency to Kafka Streams.
>>>
>>> If you have two exactly once jobs, the second job may only read data
>>> that has been committed (not dirty as Chesnay said). If the second job were
>>> to consume data that is uncommitted, it will result in duplicates, in case
>>> the first job fails and rolls back.
>>>
>>> You can configure the read behavior with isolation.level. If you want
>>> to implement exactly once behavior, you also need to set that level in your
>>> other Kafka consumers [1]. Also compare what Kafka Streams is setting if
>>> you want to go exactly once [2].
>>>
>>> If you really want low latency, please also double-check if you really
>>> need exactly once.
>>>
>>> [1]
>>> https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
>>> [2]
>>> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#processing-guarantee
>>>
>>> On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler 
>>> wrote:
>>>
 I don't particularly know the our Kafka connector, but it sounds like a
 matter of whether a given consumer does dirty reads.
 Flink does not, whereas the other tools you're using do.

 On 12/28/2020 7:57 AM, Daniel Peled wrote:

 Hello,

 We have 2 flink jobs that communicate with each other through a KAFKA
 topic.
 Both jobs use checkpoints with EXACTLY ONCE semantic.

 We have seen the following behaviour and we want to make sure and ask
 if this is the expected behaviour or maybe it is a bug.

 When the first job produces a message to KAFKA, the message is consumed
  by the second job with a latency that depends on the *first* job 
 *checkpoint
 interval*.

 We are able to read the message using the kafka tool or using another
 kafka consumer, but NOT with a flink kafka consumer that again depends on
 the checkpoint interval of the first job.

 How come the consumer of the second job depends on the producer
 transaction commit time of the first job ?

 BR,
 Danny



>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>> 
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>


Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Arvid Heise
Sorry Robert for not checking the complete example. New sources are used
with fromSource instead of addSource. It's not ideal but hopefully we can
remove the old way rather soonish to avoid confusion.

On Tue, Jan 5, 2021 at 5:23 PM Robert Cullen  wrote:

> Arvid,
>
> Thank you. Sorry, my last post was not clear. This line:
>
> env.addSource(source)
>
> does not compile since addSource is expecting a SourceFunction not a
> KafkaSource type.
>
> On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise  wrote:
>
>> Robert,
>>
>> here I modified your example with some highlights.
>>
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>> KafkaSource source = KafkaSource
>> .builder()
>> .setBootstrapServers("kafka-headless:9092")
>> .setTopics(Arrays.asList("log-input"))
>> 
>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>> .*setBounded*(OffsetsInitializer.latest())
>> .build();
>>
>> env.addSource(source);
>>
>> You can also explicitely set but that shouldn't be necessary (and may
>> make things more complicated once you also want to execute the application
>> in streaming mode).
>>
>> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>>
>>
>> On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen 
>> wrote:
>>
>>> Arvid,
>>>
>>> Thanks, Can you show me an example of how the source is tied to the
>>> ExecutionEnivornment.
>>>
>>> final StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>>
>>> KafkaSource source = KafkaSource
>>> .builder()
>>> .setBootstrapServers("kafka-headless:9092")
>>> .setTopics(Arrays.asList("log-input"))
>>> 
>>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>>> .setUnbounded(OffsetsInitializer.latest())
>>> .build();
>>>
>>> env.addSource(source);
>>>
>>>
>>> On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise  wrote:
>>>
 Hi Robert,

 you basically just (re)write your application with DataStream API, use
 the new KafkaSource, and then let the automatic batch detection mode work
 [1].
 The most important part is that all your sources need to be bounded.
 Assuming that you just have a Kafka source, you need to use setBounded
 with the appropriate end offset/timestamp.

 Note that the rewritten Kafka source still has a couple of issues that
 should be addressed by the first bugfix release of 1.12 in this month. So
 while it's safe to use for development, I'd wait for 1.12.1 to roll it out
 on production.

 If you have specific questions on the migration from DataSet and
 DataStream, please let me know.

 [1]
 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html

 On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen 
 wrote:

> I have a Kafka source that I would like to run a batch job on.  Since
> Version 1.12.0 is now soft deprecating the DataSet API in favor of the
> DataStream API, can someone show me an example of this? (Using DataStream)
>
> thanks
> --
> Robert Cullen
> 240-475-4490
>


 --

 Arvid Heise | Senior Java Developer

 

 Follow us @VervericaData

 --

 Join Flink Forward  - The Apache Flink
 Conference

 Stream Processing | Event Driven | Real Time

 --

 Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

 --
 Ververica GmbH
 Registered at Amtsgericht Charlottenburg: HRB 158244 B
 Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
 (Toni) Cheng

>>>
>>>
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
> Robert Cullen
> 240-475-4490
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Chen

Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Robert Cullen
Arvid,

Thank you. Sorry, my last post was not clear. This line:

env.addSource(source)

does not compile since addSource is expecting a SourceFunction not a
KafkaSource type.

On Tue, Jan 5, 2021 at 11:16 AM Arvid Heise  wrote:

> Robert,
>
> here I modified your example with some highlights.
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
> KafkaSource source = KafkaSource
> .builder()
> .setBootstrapServers("kafka-headless:9092")
> .setTopics(Arrays.asList("log-input"))
> 
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
> .*setBounded*(OffsetsInitializer.latest())
> .build();
>
> env.addSource(source);
>
> You can also explicitely set but that shouldn't be necessary (and may make
> things more complicated once you also want to execute the application in
> streaming mode).
>
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>
>
> On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen 
> wrote:
>
>> Arvid,
>>
>> Thanks, Can you show me an example of how the source is tied to the
>> ExecutionEnivornment.
>>
>> final StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>>
>> KafkaSource source = KafkaSource
>> .builder()
>> .setBootstrapServers("kafka-headless:9092")
>> .setTopics(Arrays.asList("log-input"))
>> 
>> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
>> .setUnbounded(OffsetsInitializer.latest())
>> .build();
>>
>> env.addSource(source);
>>
>>
>> On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise  wrote:
>>
>>> Hi Robert,
>>>
>>> you basically just (re)write your application with DataStream API, use
>>> the new KafkaSource, and then let the automatic batch detection mode work
>>> [1].
>>> The most important part is that all your sources need to be bounded.
>>> Assuming that you just have a Kafka source, you need to use setBounded
>>> with the appropriate end offset/timestamp.
>>>
>>> Note that the rewritten Kafka source still has a couple of issues that
>>> should be addressed by the first bugfix release of 1.12 in this month. So
>>> while it's safe to use for development, I'd wait for 1.12.1 to roll it out
>>> on production.
>>>
>>> If you have specific questions on the migration from DataSet and
>>> DataStream, please let me know.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
>>>
>>> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen 
>>> wrote:
>>>
 I have a Kafka source that I would like to run a batch job on.  Since
 Version 1.12.0 is now soft deprecating the DataSet API in favor of the
 DataStream API, can someone show me an example of this? (Using DataStream)

 thanks
 --
 Robert Cullen
 240-475-4490

>>>
>>>
>>> --
>>>
>>> Arvid Heise | Senior Java Developer
>>>
>>> 
>>>
>>> Follow us @VervericaData
>>>
>>> --
>>>
>>> Join Flink Forward  - The Apache Flink
>>> Conference
>>>
>>> Stream Processing | Event Driven | Real Time
>>>
>>> --
>>>
>>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>
>>> --
>>> Ververica GmbH
>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>>> (Toni) Cheng
>>>
>>
>>
>> --
>> Robert Cullen
>> 240-475-4490
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Robert Cullen
240-475-4490


RE: Submitting a job in non-blocking mode using curl and the REST API

2021-01-05 Thread Adam Roberts
Thanks Chesnay for the prompt response - ah, so my cunning plan to use execution.attached=true doesn't sound so reasonable now then (I was going to look at providing that as a programArg next).
 
I did find, in https://ci.apache.org/projects/flink/flink-docs-release-1.11/release-notes/flink-1.11.html, this which I assume you are referring to
Web Submission behaves the same as detached mode.
With FLINK-16657 the web submission logic changes and it exposes the same behavior as submitting a job through the CLI in detached mode. This implies that, for instance, jobs based on the DataSet API that were using sinks like print(), count() or collect() will now throw an exception while before the output was simply never printed. See also comments on related PR.
 
So, here's a question - if we are advised to use a proxy to support alternative auth mechanisms, and those mechanisms don’t work with the CLI (thus forcing the use of curl)...how are we supposed to submit a job with print(), count() or collect() etc?
 
I know you've said it's not supported, but is that an "at the moment" kinda thing? Is this something planned or something you think I should create a JIRA issue for?
 
Thanks again, much appreciated
 
- Original message -From: Chesnay Schepler To: Adam Roberts , user@flink.apache.orgCc:Subject: [EXTERNAL] Re: Submitting a job in non-blocking mode using curl and the REST APIDate: Tue, Jan 5, 2021 4:07 PM    

 

All jobs going through the web-submission are run in detached mode for technical reasons (blocking of threads, and information having to be transported back to the JobManager for things like collect()).
 
You unfortunately cannot run non-detached/attached/blocking jobs via the web submission, which includes the WordCount example because it uses specific methods (the ones mentioned in the exception; collect, print, printToErr, count).
 
In other words, your setup appears to be fine correctly, you are just trying to do something that is not supported.
 
On 1/5/2021 4:07 PM, Adam Roberts wrote:

Hey everyone, I've got an awesome looking Flink cluster set up with web.submit.enable=true, and plenty of bash for handling jar upload and then submission to a JobManager - all good so far.
 
Unfortunately, when I try to submit the classic WordCount example, I get a massive error with the jist of it being:
 
"Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]."
 
So, how do I run it *not* in detached mode using curl please?
 
I'm intentionally not using the Flink CLI because I am using an nginx with auth proxy set up - so I'm doing everything with curl, in a bash script - (so, two requests - one to upload the jar, then I get the ID from the response, and then submit the job with that ID).
 
At https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html if you ctrl-f for /run, there's nothing obvious that indicates how I can run in blocking mode - the biggest clue I've got is `programArg`. So I'm wondering if I can provide that somehow.
 
For those who prefer code:
 
curl ${auth_options} ${self_signed_flag} ${ca_cert_flag} -X POST -H "Content-Type: application/json" https://${JOBMANAGER}/jars/${uploaded_jar_string}/run$programArgsToUse
 
Whereby programArgsToUse is user args, and I'm cool with them being query parameters for now - I think.
 
I'm passing them on the end with:if [[ ! -z $program_args ]] ; then  programArgsToUse="?programArg=$program_args"fi
 so my eventual curl looks like this. But, I'm really just guessing what the detached argument is...
 
curl --cacert /etc/ssl/tester/certs/ca.crt -X POST -H 'Content-Type: application/json' 'https://tester-minimal-tls-sample-jobmanager:8081/jars/fdc7684f-323d-49fa-a60a-96683d953be8_WordCount.jar/run?programArg=detached=false'
 
(obviously, what's at the end looks really wrong, but IDK what to use)
 
The only mention of "detach" I see documented is at https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html, and that says by default it's not in detached mode for execution. If there are any better docs or examples, please send them my way - or if you've spotted me being just plain silly with my bash, that would be fantastic to point out.
 
Thanks a lot in advance, cheers, happy to share any more code/background if need beUnless stated otherwise above:IBM United Kingdom Limited - Registered in England and Wales with number 741598.Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU 
 
 Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU




Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Arvid Heise
Robert,

here I modified your example with some highlights.

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource source = KafkaSource
.builder()
.setBootstrapServers("kafka-headless:9092")
.setTopics(Arrays.asList("log-input"))

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
.*setBounded*(OffsetsInitializer.latest())
.build();

env.addSource(source);

You can also explicitely set but that shouldn't be necessary (and may make
things more complicated once you also want to execute the application in
streaming mode).

env.setRuntimeMode(RuntimeExecutionMode.BATCH);


On Tue, Jan 5, 2021 at 4:51 PM Robert Cullen  wrote:

> Arvid,
>
> Thanks, Can you show me an example of how the source is tied to the
> ExecutionEnivornment.
>
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
>
> KafkaSource source = KafkaSource
> .builder()
> .setBootstrapServers("kafka-headless:9092")
> .setTopics(Arrays.asList("log-input"))
> 
> .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
> .setUnbounded(OffsetsInitializer.latest())
> .build();
>
> env.addSource(source);
>
>
> On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise  wrote:
>
>> Hi Robert,
>>
>> you basically just (re)write your application with DataStream API, use
>> the new KafkaSource, and then let the automatic batch detection mode work
>> [1].
>> The most important part is that all your sources need to be bounded.
>> Assuming that you just have a Kafka source, you need to use setBounded
>> with the appropriate end offset/timestamp.
>>
>> Note that the rewritten Kafka source still has a couple of issues that
>> should be addressed by the first bugfix release of 1.12 in this month. So
>> while it's safe to use for development, I'd wait for 1.12.1 to roll it out
>> on production.
>>
>> If you have specific questions on the migration from DataSet and
>> DataStream, please let me know.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
>>
>> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen 
>> wrote:
>>
>>> I have a Kafka source that I would like to run a batch job on.  Since
>>> Version 1.12.0 is now soft deprecating the DataSet API in favor of the
>>> DataStream API, can someone show me an example of this? (Using DataStream)
>>>
>>> thanks
>>> --
>>> Robert Cullen
>>> 240-475-4490
>>>
>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>
>
> --
> Robert Cullen
> 240-475-4490
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Submitting a job in non-blocking mode using curl and the REST API

2021-01-05 Thread Chesnay Schepler
All jobs going through the web-submission are run in detached mode for 
technical reasons (blocking of threads, and information having to be 
transported back to the JobManager for things like collect()).


You unfortunately cannot run non-detached/attached/blocking jobs via the 
web submission, which includes the WordCount example because it uses 
specific methods (the ones mentioned in the exception; collect, print, 
printToErr, count).


In other words, your setup appears to be fine correctly, you are just 
trying to do something that is not supported.


On 1/5/2021 4:07 PM, Adam Roberts wrote:
Hey everyone, I've got an awesome looking Flink cluster set up 
withweb.submit.enable=true, and plenty of bash for handling jar upload 
and then submission to a JobManager - all good so far.
Unfortunately, when I try to submit the classic WordCount example, I 
get a massive error with the jist of it being:
/"Job was submitted in detached mode. Results of job execution, such 
as accumulators, runtime, etc. are not available. Please make sure 
your program doesn't call an eager execution function [collect, print, 
printToErr, count]."/

*So, how do I run it *not* in detached mode using curl please?*
I'm intentionally not using the Flink CLI because I am using an nginx 
with auth proxy set up - so I'm doing everything with curl, in a bash 
script - (so, two requests - one to upload the jar, then I get the ID 
from the response, and then submit the job with that ID).
At 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html if 
you ctrl-f for /run, there's nothing obvious that indicates how I can 
run in blocking mode - the biggest clue I've got is `programArg`. So 
I'm wondering if I can provide that somehow.

For those who prefer code:
/curl ${auth_options} ${self_signed_flag} ${ca_cert_flag} -X POST -H 
"Content-Type: 
application/json"https://${JOBMANAGER}/jars/${uploaded_jar_string}/run$programArgsToUse 
/
Whereby/programArgsToUse/is user args, and I'm cool with them being 
query parameters for now - I think.

I'm passing them on the end with:

if [[ ! -z $program_args ]] ; then
  programArgsToUse="?programArg=$program_args"
fi
so my eventual curl looks like this. But, I'm really just guessing 
what the detached argument is...
curl --cacert /etc/ssl/tester/certs/ca.crt -X POST -H 'Content-Type: 
application/json' 
'https://tester-minimal-tls-sample-jobmanager:8081/jars/fdc7684f-323d-49fa-a60a-96683d953be8_WordCount.jar/run?programArg=detached=false'

(obviously, what's at the end looks really wrong, but IDK what to use)
The only mention of "detach" I see documented is at 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html, 
and that says by default it's not in detached mode for execution. If 
there are any better docs or examples, please send them my way - or if 
you've spotted me being just plain silly with my bash, that would be 
fantastic to point out.
Thanks a lot in advance, cheers, happy to share any more 
code/background if need be

Unless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with 
number 741598.

Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU





Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Robert Cullen
Arvid,

Thanks, Can you show me an example of how the source is tied to the
ExecutionEnivornment.

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

KafkaSource source = KafkaSource
.builder()
.setBootstrapServers("kafka-headless:9092")
.setTopics(Arrays.asList("log-input"))

.setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
.setUnbounded(OffsetsInitializer.latest())
.build();

env.addSource(source);


On Tue, Jan 5, 2021 at 7:28 AM Arvid Heise  wrote:

> Hi Robert,
>
> you basically just (re)write your application with DataStream API, use the
> new KafkaSource, and then let the automatic batch detection mode work [1].
> The most important part is that all your sources need to be bounded.
> Assuming that you just have a Kafka source, you need to use setBounded
> with the appropriate end offset/timestamp.
>
> Note that the rewritten Kafka source still has a couple of issues that
> should be addressed by the first bugfix release of 1.12 in this month. So
> while it's safe to use for development, I'd wait for 1.12.1 to roll it out
> on production.
>
> If you have specific questions on the migration from DataSet and
> DataStream, please let me know.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
>
> On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen 
> wrote:
>
>> I have a Kafka source that I would like to run a batch job on.  Since
>> Version 1.12.0 is now soft deprecating the DataSet API in favor of the
>> DataStream API, can someone show me an example of this? (Using DataStream)
>>
>> thanks
>> --
>> Robert Cullen
>> 240-475-4490
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


-- 
Robert Cullen
240-475-4490


"flink list" fails when zk-based HA is enabled in flink-conf.yaml

2021-01-05 Thread Dongwon Kim
Hi,

I'm using Flink-1.12.0 and running on Hadoop YARN.

After setting HA-related properties in flink-conf.yaml,

high-availability: zookeeper

high-availability.zookeeper.path.root: /recovery

high-availability.zookeeper.quorum: nm01:2181,nm02:2181,nm03:2181

high-availability.storageDir: hdfs:///flink/recovery

the following command hangs and fails:

$ flink list --target yarn-per-job -Dyarn.application.id=$application_id

Before setting the properties, I can see the following lines after
executing the above command:

2021-01-06 00:11:48,961 INFO
org.apache.flink.runtime.security.modules.HadoopModule
  [] - Hadoop user set to deploy (auth:SIMPLE)

2021-01-06 00:11:48,968 INFO
org.apache.flink.runtime.security.modules.JaasModule
[] - Jaas file will be created as
/tmp/jaas-8522045433029410483.conf.

2021-01-06 00:11:48,976 INFO  org.apache.flink.client.cli.CliFrontend
[] - Running 'list' command.

2021-01-06 00:11:49,316 INFO  org.apache.hadoop.yarn.client.AHSProxy
[] - Connecting to Application History server at nm02/
10.93.0.91:10200

2021-01-06 00:11:49,324 INFO  org.apache.flink.yarn.YarnClusterDescriptor
[] - No path for the flink jar passed. Using the location
of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

2021-01-06 00:11:49,333 WARN  org.apache.flink.yarn.YarnClusterDescriptor
[] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR
environment variable is set.The Flink YARN Client needs one of these to be
set to properly load the Hadoop configuration for accessing YARN.

2021-01-06 00:11:49,404 INFO  org.apache.flink.yarn.YarnClusterDescriptor
[] - Found Web Interface dn03:37098 of application
'application_1600163418174_0127'.

2021-01-06 00:11:49,758 INFO  org.apache.flink.client.cli.CliFrontend
[] - Waiting for response...

Waiting for response...

2021-01-06 00:11:49,863 INFO  org.apache.flink.client.cli.CliFrontend
[] - Successfully retrieved list of jobs

-- Running/Restarting Jobs ---

31.12.2020 01:22:34 : 76fc265c44ef44ae343ab15868155de6 : stream calculator
(RUNNING)

--

No scheduled jobs.

After:

2021-01-06 00:06:38,971 INFO
org.apache.flink.runtime.security.modules.HadoopModule
  [] - Hadoop user set to deploy (auth:SIMPLE)

2021-01-06 00:06:38,976 INFO
org.apache.flink.runtime.security.modules.JaasModule
[] - Jaas file will be created as
/tmp/jaas-3613274701724362777.conf.

2021-01-06 00:06:38,982 INFO  org.apache.flink.client.cli.CliFrontend
[] - Running 'list' command.

2021-01-06 00:06:39,304 INFO  org.apache.hadoop.yarn.client.AHSProxy
[] - Connecting to Application History server at nm02/
10.93.0.91:10200

2021-01-06 00:06:39,312 INFO  org.apache.flink.yarn.YarnClusterDescriptor
[] - No path for the flink jar passed. Using the location
of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar

2021-01-06 00:06:39,320 WARN  org.apache.flink.yarn.YarnClusterDescriptor
[] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR
environment variable is set.The Flink YARN Client needs one of these to be
set to properly load the Hadoop configuration for accessing YARN.

2021-01-06 00:06:39,388 INFO  org.apache.flink.yarn.YarnClusterDescriptor
[] - Found Web Interface dn03:37098 of application
'application_1600163418174_0127'.

2021-01-06 00:06:39,399 INFO  org.apache.flink.runtime.util.ZooKeeperUtils
  [] - Enforcing default ACL for ZK connections

2021-01-06 00:06:39,399 INFO  org.apache.flink.runtime.util.ZooKeeperUtils
  [] - Using '/recovery/default' as Zookeeper namespace.

2021-01-06 00:06:39,425 INFO
org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility
[] - Running in ZooKeeper 3.4.x compatibility mode

2021-01-06 00:06:39,425 INFO
org.apache.flink.shaded.curator4.org.apache.curator.utils.Compatibility
[] - Using emulated InjectSessionExpiration

2021-01-06 00:06:39,447 INFO
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl
[] - Starting

2021-01-06 00:06:39,455 INFO
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper
[] - Initiating client connection, connectString=nm01:2181,

nm02:2181,nm03:2181 sessionTimeout=6
watcher=org.apache.flink.shaded.curator4.org.apache.curator.ConnectionState@7668d560

2021-01-06 00:06:39,466 INFO
org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorFrameworkImpl
[] - Default schema

2021-01-06 00:06:39,466 WARN
org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ClientCnxn
[] - SASL configuration failed: javax.security.auth.login.LoginException:
No JAAS configuration section named 'Client' was found in specified JAAS
configuration file: '/tmp/jaas-3613274701724362777.conf'. Will continue
connec

Submitting a job in non-blocking mode using curl and the REST API

2021-01-05 Thread Adam Roberts
Hey everyone, I've got an awesome looking Flink cluster set up with web.submit.enable=true, and plenty of bash for handling jar upload and then submission to a JobManager - all good so far.
 
Unfortunately, when I try to submit the classic WordCount example, I get a massive error with the jist of it being:
 
"Job was submitted in detached mode. Results of job execution, such as accumulators, runtime, etc. are not available. Please make sure your program doesn't call an eager execution function [collect, print, printToErr, count]."
 
So, how do I run it *not* in detached mode using curl please?
 
I'm intentionally not using the Flink CLI because I am using an nginx with auth proxy set up - so I'm doing everything with curl, in a bash script - (so, two requests - one to upload the jar, then I get the ID from the response, and then submit the job with that ID).
 
At https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/rest_api.html if you ctrl-f for /run, there's nothing obvious that indicates how I can run in blocking mode - the biggest clue I've got is `programArg`. So I'm wondering if I can provide that somehow.
 
For those who prefer code:
 
curl ${auth_options} ${self_signed_flag} ${ca_cert_flag} -X POST -H "Content-Type: application/json" https://${JOBMANAGER}/jars/${uploaded_jar_string}/run$programArgsToUse
 
Whereby programArgsToUse is user args, and I'm cool with them being query parameters for now - I think.
 
I'm passing them on the end with:if [[ ! -z $program_args ]] ; then  programArgsToUse="?programArg=$program_args"fi
 so my eventual curl looks like this. But, I'm really just guessing what the detached argument is...
 
curl --cacert /etc/ssl/tester/certs/ca.crt -X POST -H 'Content-Type: application/json' 'https://tester-minimal-tls-sample-jobmanager:8081/jars/fdc7684f-323d-49fa-a60a-96683d953be8_WordCount.jar/run?programArg=detached=false'
 
(obviously, what's at the end looks really wrong, but IDK what to use)
 
The only mention of "detach" I see documented is at https://ci.apache.org/projects/flink/flink-docs-stable/deployment/config.html, and that says by default it's not in detached mode for execution. If there are any better docs or examples, please send them my way - or if you've spotted me being just plain silly with my bash, that would be fantastic to point out.
 
Thanks a lot in advance, cheers, happy to share any more code/background if need beUnless stated otherwise above:
IBM United Kingdom Limited - Registered in England and Wales with number 741598. 
Registered office: PO Box 41, North Harbour, Portsmouth, Hampshire PO6 3AU




Re: UDTAGG and SQL

2021-01-05 Thread Timo Walther
A subquery could work but since you want to implement a UDTAGG anyway, 
you can also move the implementation there and keep the SQL query 
simple. But this is up to you. Consecutive windows are supported.


Regards,
Timo


On 05.01.21 15:23, Marco Villalobos wrote:

Hi Timo,

Thank you for the quick response.

Neither COLLECT nor LISTAGG work because they only accept one column.

I am trying to collect all the rows and columns into one object. Like a 
List for example.
Later, I need make calculations upon all the rows that were just collected 
within a window.

Maybe I need to use a subquery, ie,  SELECT FROM (SELECT FROM)?


On Jan 5, 2021, at 6:10 AM, Timo Walther  wrote:

Hi Marco,

nesting aggregated functions is not allowed in SQL. The exception could be 
improved though. I guess the planner searches for a scalar function called 
`MyUDTAGG` in your example and cannot find one.

Maybe the built-in function `COLLECT` or `LISTAGG`is what you are looking for?

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html

Regards,
Timo


On 05.01.21 14:45, Marco Villalobos wrote:

  I am trying to use User defined Table Aggregate function directly in the SQL 
so that I could combine all the rows collected in a window into one object.
GIVEN a User defined Table Aggregate function
public class MyUDTAGG extends 
TableAggregateFunction {
public PurchaseWindow createAccumulator() {
return new PurchaseWindow();
}
public void accumulate(PurchaseWindow acc, String name, double cost) {
acc.add(name, cost);
}
public void emitValue(PurchaseWindow acc, Collector 
out) {
out.collect(acc);
}
}
THAT it is registered as
StreamTableEnvironment tEnv = ...
tEnv.registerFunction("MyUDTAGG", new MyUDTAGG());
THEN is it possible to call it in an SQL query in this manner?
SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window
FROM purchases
GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name
I am receiving an SQL validation error, "No match found for function signature 
...".
What am I doing wrong, or is there a better way to do this?








Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Yun Gao
 Hi Avrid, 

 Very thanks for the feedbacks!

 For the second issue, sorry I think I might not make it very clear, 
I'm initially thinking the case that for example for a job with graph A -> B -> 
C, when we compute which tasks to trigger, A is still running, so we trigger A 
to start the checkpoint. However, before the triggering message reached A, A 
gets finished and the trigger message failed due to not found the task. In this 
case if we do not handle it, the checkpoint would failed due to timeout. 
However, by default failed checkpoint would cause job failure and we would also 
need to wait for a checkpoint interval for the next checkpoint. One solution 
would be check all the pending checkpoints to trigger B instead when JM is 
notified that A is finished.

   For the third issue, it should work if we store a special value for some 
filed in OperatorState or OperatorSubtaskState, for example, we might store a 
special subtaskState map inside the OperatorState to mark it is finished since 
the finished operator should always have an empty state. Very thanks for the 
advices! I'll try with this method. 

Best,
 Yun



--
From:Arvid Heise 
Send Time:2021 Jan. 5 (Tue.) 17:16
To:Yun Gao 
Cc:Aljoscha Krettek ; dev ; user 

Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

Hi Yun,

1. I'd think that this is an orthogonal issue, which I'd solve separately. My 
gut feeling says that this is something we should only address for new sinks 
where we decouple the semantics of commits and checkpoints anyways. @Aljoscha 
Krettek any idea on this one?

2. I'm not sure I get it completely. Let's assume we have a source partition 
that is finished before the first checkpoint. Then, we would need to store the 
finished state of the subtask somehow. So I'm assuming, we still need to 
trigger some checkpointing code on finished subtasks.

3. Do we really want to store the finished flag in OperatorState? I was 
assuming we want to have it more fine-grained on OperatorSubtaskState. Maybe we 
can store the flag inside managed or raw state without changing the format?



On Fri, Dec 25, 2020 at 8:39 AM Yun Gao  wrote:

   Hi all,

 I tested the previous PoC with the current tests and I found some new 
issues that might cause divergence, and sorry for there might also be some 
reversal for some previous problems:

 1. Which operators should wait for one more checkpoint before close ?

One motivation for this FLIP is to ensure the 2PC sink commits the last 
part of data before closed, which makes the sink operator need to wait for one 
more checkpoint like onEndOfInput() -> waitForCheckpoint() -> 
notifyCheckpointComplete() -> close(). This lead to the issue which operators 
should wait for checkpoint? Possible options are 
 a. Make all the operators (or UDF) implemented 
notifyCheckpointCompleted method wait for one more checkpoint. One exception is 
that since we can only snapshot one or all tasks for a legacy source operator 
to avoid data repetition[1], we could not support legacy operators and its 
chained operators to wait for checkpoints since there will be deadlock if part 
of the tasks are finished, this would finally be solved after legacy source are 
deprecated. The PoC used this option for now.
b. Make operators (or UDF) implemented a special marker 
interface to wait for one more checkpoint.  


   2. Do we need to solve the case that tasks finished before triggered ?

  Previously I think we could postpone it, however, during testing I found 
that it might cause some problems since by default checkpoint failure would 
cause job failover, and the job would also need wait for another interval to 
trigger the next checkpoint. To pass the tests, I updated the PoC to include 
this part, and we may have a double think on if we need to include it or use 
some other options.

3. How to extend a new format for checkpoint meta ?

Sorry previously I gave a wrong estimation, after I extract a sub-component 
for (de)serialize operator state, I found the problem just goes to the new 
OperatorStateSerializer. The problem seems to be that v2, v3 and v4 have 
different fields, thus they use different process when (de)serialize, which is 
a bit different from the case that we have a fixed steps and each step has 
different logic. Thus we might either
 a. Use base classes for each two version.
 b. Or have a unified framework contains all the possible fields across all 
version, and use empty field serializer to skip some fields in each version.

Best,
Yun

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks

--

Re: UDTAGG and SQL

2021-01-05 Thread Marco Villalobos
Hi Timo,

Thank you for the quick response.

Neither COLLECT nor LISTAGG work because they only accept one column.

I am trying to collect all the rows and columns into one object. Like a 
List for example.
Later, I need make calculations upon all the rows that were just collected 
within a window.

Maybe I need to use a subquery, ie,  SELECT FROM (SELECT FROM)?

> On Jan 5, 2021, at 6:10 AM, Timo Walther  wrote:
> 
> Hi Marco,
> 
> nesting aggregated functions is not allowed in SQL. The exception could be 
> improved though. I guess the planner searches for a scalar function called 
> `MyUDTAGG` in your example and cannot find one.
> 
> Maybe the built-in function `COLLECT` or `LISTAGG`is what you are looking for?
> 
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html
> 
> Regards,
> Timo
> 
> 
> On 05.01.21 14:45, Marco Villalobos wrote:
>>  I am trying to use User defined Table Aggregate function directly in the 
>> SQL so that I could combine all the rows collected in a window into one 
>> object.
>> GIVEN a User defined Table Aggregate function
>> public class MyUDTAGG extends 
>> TableAggregateFunction {
>>  public PurchaseWindow createAccumulator() {
>>  return new PurchaseWindow();
>>  }
>>  public void accumulate(PurchaseWindow acc, String name, double cost) {
>>  acc.add(name, cost);
>>  }
>>  public void emitValue(PurchaseWindow acc, Collector 
>> out) {
>>  out.collect(acc);
>>  }
>> }
>> THAT it is registered as
>> StreamTableEnvironment tEnv = ...
>> tEnv.registerFunction("MyUDTAGG", new MyUDTAGG());
>> THEN is it possible to call it in an SQL query in this manner?
>> SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window
>> FROM purchases
>> GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name
>> I am receiving an SQL validation error, "No match found for function 
>> signature ...".
>> What am I doing wrong, or is there a better way to do this?
> 



Re: UDTAGG and SQL

2021-01-05 Thread Timo Walther

Hi Marco,

nesting aggregated functions is not allowed in SQL. The exception could 
be improved though. I guess the planner searches for a scalar function 
called `MyUDTAGG` in your example and cannot find one.


Maybe the built-in function `COLLECT` or `LISTAGG`is what you are 
looking for?


https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/systemFunctions.html

Regards,
Timo


On 05.01.21 14:45, Marco Villalobos wrote:

  I am trying to use User defined Table Aggregate function directly in the SQL 
so that I could combine all the rows collected in a window into one object.

GIVEN a User defined Table Aggregate function

public class MyUDTAGG extends 
TableAggregateFunction {

public PurchaseWindow createAccumulator() {
return new PurchaseWindow();
}

public void accumulate(PurchaseWindow acc, String name, double cost) {
acc.add(name, cost);
}

public void emitValue(PurchaseWindow acc, Collector 
out) {
out.collect(acc);
}
}

THAT it is registered as

StreamTableEnvironment tEnv = ...
tEnv.registerFunction("MyUDTAGG", new MyUDTAGG());

THEN is it possible to call it in an SQL query in this manner?

SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window
FROM purchases
GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name


I am receiving an SQL validation error, "No match found for function signature 
...".

What am I doing wrong, or is there a better way to do this?







UDTAGG and SQL

2021-01-05 Thread Marco Villalobos
 I am trying to use User defined Table Aggregate function directly in the SQL 
so that I could combine all the rows collected in a window into one object.

GIVEN a User defined Table Aggregate function

public class MyUDTAGG extends 
TableAggregateFunction {

public PurchaseWindow createAccumulator() {
return new PurchaseWindow();
}

public void accumulate(PurchaseWindow acc, String name, double cost) {
acc.add(name, cost);
}

public void emitValue(PurchaseWindow acc, Collector 
out) {
out.collect(acc);
}
}

THAT it is registered as

StreamTableEnvironment tEnv = ...
tEnv.registerFunction("MyUDTAGG", new MyUDTAGG());

THEN is it possible to call it in an SQL query in this manner?

SELECT MyUDTAGG(name, SUM(cost)) AS purchase_window
FROM purchases
GROUP BY TUMBLE(proactive, INTERVAL '1' DAY), name


I am receiving an SQL validation error, "No match found for function signature 
...".

What am I doing wrong, or is there a better way to do this?





Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

2021-01-05 Thread 赵一旦
I think what you need is
http://kafka.apache.org/documentation/#consumerconfigs_isolation.level .

The isolation.level setting's default value is read_uncommitted. So, maybe
you do not use the default setting?

赵一旦  于2021年1月5日周二 下午9:10写道:

> I do not have this problem, so I guess it is related with the config of
> your kafka producer and consumer, and maybe kafka topic properties or kafka
> server properties also.
>
> Arvid Heise  于2021年1月5日周二 下午6:47写道:
>
>> Hi Daniel,
>>
>> Flink commits transactions on checkpoints while Kafka Streams/connect
>> usually commits on record. This is the typical tradeoff between Throughput
>> and Latency. By decreasing the checkpoint interval in Flink, you can reach
>> comparable latency to Kafka Streams.
>>
>> If you have two exactly once jobs, the second job may only read data that
>> has been committed (not dirty as Chesnay said). If the second job were to
>> consume data that is uncommitted, it will result in duplicates, in case the
>> first job fails and rolls back.
>>
>> You can configure the read behavior with isolation.level. If you want to
>> implement exactly once behavior, you also need to set that level in your
>> other Kafka consumers [1]. Also compare what Kafka Streams is setting if
>> you want to go exactly once [2].
>>
>> If you really want low latency, please also double-check if you really
>> need exactly once.
>>
>> [1]
>> https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
>> [2]
>> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#processing-guarantee
>>
>> On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler 
>> wrote:
>>
>>> I don't particularly know the our Kafka connector, but it sounds like a
>>> matter of whether a given consumer does dirty reads.
>>> Flink does not, whereas the other tools you're using do.
>>>
>>> On 12/28/2020 7:57 AM, Daniel Peled wrote:
>>>
>>> Hello,
>>>
>>> We have 2 flink jobs that communicate with each other through a KAFKA
>>> topic.
>>> Both jobs use checkpoints with EXACTLY ONCE semantic.
>>>
>>> We have seen the following behaviour and we want to make sure and ask if
>>> this is the expected behaviour or maybe it is a bug.
>>>
>>> When the first job produces a message to KAFKA, the message is consumed
>>>  by the second job with a latency that depends on the *first* job 
>>> *checkpoint
>>> interval*.
>>>
>>> We are able to read the message using the kafka tool or using another
>>> kafka consumer, but NOT with a flink kafka consumer that again depends on
>>> the checkpoint interval of the first job.
>>>
>>> How come the consumer of the second job depends on the producer
>>> transaction commit time of the first job ?
>>>
>>> BR,
>>> Danny
>>>
>>>
>>>
>>
>> --
>>
>> Arvid Heise | Senior Java Developer
>>
>> 
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward  - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Ververica GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
>> (Toni) Cheng
>>
>


Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

2021-01-05 Thread 赵一旦
I do not have this problem, so I guess it is related with the config of
your kafka producer and consumer, and maybe kafka topic properties or kafka
server properties also.

Arvid Heise  于2021年1月5日周二 下午6:47写道:

> Hi Daniel,
>
> Flink commits transactions on checkpoints while Kafka Streams/connect
> usually commits on record. This is the typical tradeoff between Throughput
> and Latency. By decreasing the checkpoint interval in Flink, you can reach
> comparable latency to Kafka Streams.
>
> If you have two exactly once jobs, the second job may only read data that
> has been committed (not dirty as Chesnay said). If the second job were to
> consume data that is uncommitted, it will result in duplicates, in case the
> first job fails and rolls back.
>
> You can configure the read behavior with isolation.level. If you want to
> implement exactly once behavior, you also need to set that level in your
> other Kafka consumers [1]. Also compare what Kafka Streams is setting if
> you want to go exactly once [2].
>
> If you really want low latency, please also double-check if you really
> need exactly once.
>
> [1]
> https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
> [2]
> https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#processing-guarantee
>
> On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler 
> wrote:
>
>> I don't particularly know the our Kafka connector, but it sounds like a
>> matter of whether a given consumer does dirty reads.
>> Flink does not, whereas the other tools you're using do.
>>
>> On 12/28/2020 7:57 AM, Daniel Peled wrote:
>>
>> Hello,
>>
>> We have 2 flink jobs that communicate with each other through a KAFKA
>> topic.
>> Both jobs use checkpoints with EXACTLY ONCE semantic.
>>
>> We have seen the following behaviour and we want to make sure and ask if
>> this is the expected behaviour or maybe it is a bug.
>>
>> When the first job produces a message to KAFKA, the message is consumed
>>  by the second job with a latency that depends on the *first* job *checkpoint
>> interval*.
>>
>> We are able to read the message using the kafka tool or using another
>> kafka consumer, but NOT with a flink kafka consumer that again depends on
>> the checkpoint interval of the first job.
>>
>> How come the consumer of the second job depends on the producer
>> transaction commit time of the first job ?
>>
>> BR,
>> Danny
>>
>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Flink SQL, temporal joins and backfilling data

2021-01-05 Thread Timo Walther

Hi Dan,

are you sure that your watermarks are still correct during reprocessing? 
As far as I know, idle state retention is not used for temporal joins. 
The watermark indicates when state can be removed in this case.


Maybe you can give us some more details about which kind of temporal 
join you are using (event-time or processing-time?) and checkpoint settings?


Regards,
Timo

On 30.12.20 08:30, Dan Hill wrote:

Hi!

I have a Flink SQL job that does a few temporal joins and has been 
running for over a month on regular data.  No issues.  Ran well.


I'm trying to re-run the Flink SQL job on the same data set but it's 
failing to checkpoint and very slow to make progress.  I've modified 
some of the checkpoint settings.


What else do I have to modify?

My data size is really small so I'm guessing it's still keeping state 
for data outside the temporal join time windows.  Do I have to set Idle 
State Retention Time to forget older data?


- Dan




Re: Comparing Flink vs Materialize

2021-01-05 Thread Arvid Heise
Hi Dan,

I have not touched Materialize yet, but the picture of them is too
simplifying.
When you run Flink in parallel, then each source shard is assigned to one
Flink source operator. Similarly, filter and map would run in parallel.
Flink chains simple operators that have the same degree of parallelism by
default, making them run on the same core. [1]

That means that their example looks exactly the same in Flink if you run
Flink with parallelism 4. You need to exchange data only for aggregations
with different keys (count in their example). The same is true for Kafka
Streams btw.

So while there may be differences in how Materialize and Flink works, this
example is not suitable to depict it.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/flink-architecture.html#tasks-and-operator-chains

On Tue, Jan 5, 2021 at 2:03 AM Dan Hill  wrote:

> Has anyone compared Flink with Materialize?  A friend recommended me
> switch to Materialize.
>
> In one of their blog posts, it says that Flink splits operators across
> CPUs (instead of splitting partitions across CPUs).  Is this true?  Is it
> configurable?
>
> https://materialize.com/blog-rocksdb/
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Kafka SQL Connector Behavior (v1.11)

2021-01-05 Thread Arvid Heise
Hi Aeden,

I just checked the code and your assumption is correct. Without an explicit
partitioner, Flink just writes ProducerRecord without partition (null), so
that whatever Kafka usually does applies.

On Tue, Jan 5, 2021 at 1:53 AM Aeden Jameson 
wrote:

> Based on these docs,
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html
> ,
> the default partitioning behavior is not quite clear to me.
> If no value for sink-partitioner is given, is the default behavior
> just that of the native Kafka library? (with key use murmur2 , without
> key round robin)
>
> Thank you,
> Aeden
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Batch with Flink Steraming API version 1.12.0

2021-01-05 Thread Arvid Heise
Hi Robert,

you basically just (re)write your application with DataStream API, use the
new KafkaSource, and then let the automatic batch detection mode work [1].
The most important part is that all your sources need to be bounded.
Assuming that you just have a Kafka source, you need to use setBounded with
the appropriate end offset/timestamp.

Note that the rewritten Kafka source still has a couple of issues that
should be addressed by the first bugfix release of 1.12 in this month. So
while it's safe to use for development, I'd wait for 1.12.1 to roll it out
on production.

If you have specific questions on the migration from DataSet and
DataStream, please let me know.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html

On Mon, Jan 4, 2021 at 7:34 PM Robert Cullen  wrote:

> I have a Kafka source that I would like to run a batch job on.  Since
> Version 1.12.0 is now soft deprecating the DataSet API in favor of the
> DataStream API, can someone show me an example of this? (Using DataStream)
>
> thanks
> --
> Robert Cullen
> 240-475-4490
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink SQL - IntervalJoin doesn't support consuming update and delete - trying to deduplicate rows

2021-01-05 Thread Arvid Heise
Hi Dan,

Which Flink version are you using? I know that there has been quite a bit
of optimization of deduplication in 1.12, which would reduce the required
state tremendously.
I'm pulling in Jark who knows more.

On Thu, Dec 31, 2020 at 6:54 AM Dan Hill  wrote:

> Hi!
>
> I'm using Flink SQL to do an interval join.  Rows in one of the tables are
> not unique.  I'm fine using either the first or last row.  When I try to
> deduplicate
> 
>  and
> then interval join, I get the following error.
>
> IntervalJoin doesn't support consuming update and delete changes which is
> produced by node Rank(strategy=[UndefinedStrategy], rankType=[ROW_NUMBER],
> rankRange=[rankStart=1, rankEnd=1], partitionBy=[log_user_id], orderBy=[ts
> ASC], select=[platform_id, user_id, log_user_id, client_log_ts,
> event_api_ts, ts])
>
> Is there a way to combine these in this order?  I could do the
> deduplication afterwards but this will result in more state.
>
> - Dan
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink Logging on EMR

2021-01-05 Thread Arvid Heise
Hi KristoffSC,

taskmanager.out should only show the output of the process starting the
taskmanager. In most cases, you probably want to look into taskmanager.log.

On Tue, Dec 29, 2020 at 3:42 PM KristoffSC 
wrote:

> Hi Mars,
> Were you able to solve this problem?
>
> I'm facing exact same issue. I dont see logs from taskmanager from my
> operators (taskmnager.out file) on EMR although running this locally from
> IDE logs are printed.
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Implementing a TarInputFormat based on FileInputFormat

2021-01-05 Thread Arvid Heise
Hi Billy,

the exception is happening on the output side. Input side looks fine. Could
you maybe post more information about the sink?

On Mon, Dec 28, 2020 at 8:11 PM Billy Bain  wrote:

> I am trying to implement a class that will work similar to AvroFileFormat.
>
> This tar archive has a very specific format. It has only one file inside
> and that file is line delimited JSON.
>
> I get this exception, but all the data is written to the temporary files.
> I have checked that my code isn't closing the stream, which was my prior
> issue.
>
> Caused by: java.nio.channels.ClosedChannelException
> at
> java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
> at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
> at
> org.apache.flink.core.fs.local.LocalRecoverableFsDataOutputStream.getPos(LocalRecoverableFsDataOutputStream.java:101)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.getSize(OutputStreamBasedPartFileWriter.java:70)
> at
> org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy.shouldRollOnCheckpoint(DefaultRollingPolicy.java:71)
> at
> org.apache.flink.connector.file.sink.writer.FileWriterBucket.prepareCommit(FileWriterBucket.java:195)
> at
> org.apache.flink.connector.file.sink.writer.FileWriter.prepareCommit(FileWriter.java:202)
> at
> org.apache.flink.streaming.runtime.operators.sink.AbstractSinkWriterOperator.endInput(AbstractSinkWriterOperator.java:97)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.endOperatorInput(StreamOperatorWrapper.java:91)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.lambda$close$0(StreamOperatorWrapper.java:127)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper$$Lambda$343/00.run(Unknown
> Source)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:127)
> at
> org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:134)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.closeOperators(OperatorChain.java:412)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:585)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:547)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
> at java.base/java.lang.Thread.run(Thread.java:836)
>
> public class TarInputFormat extends FileInputFormat implements
> ResultTypeQueryable {
>
> private static final Logger logger =
> LoggerFactory.getLogger(TarInputFormat.class);
> private transient TarArchiveInputStream tarArchiveInputStream;
> private TarArchiveEntry nextEntry;
> private final Class valueType;
> private long currentPosition = 0L;
> private static final ObjectMapper objectMapper = new ObjectMapper();
>
> public TarInputFormat(Path filePath, Class valueType) {
> super(filePath);
> this.valueType = valueType;
> this.unsplittable = true;
> this.setNumSplits(1);
> }
>
> @Override
> public TypeInformation getProducedType() {
> return TypeExtractor.getForClass(this.valueType);
> }
>
> @Override
> public void open(FileInputSplit split) throws IOException {
> super.open(split);
> tarArchiveInputStream = new TarArchiveInputStream(stream);
> nextEntry = tarArchiveInputStream.getNextTarEntry();
> logger.info("Entry Name={} size={}",nextEntry.getName(),
> nextEntry.getSize());
> }
>
> @Override
> public void close() throws IOException {
> super.close();
> if (tarArchiveInputStream != null) {
> tarArchiveInputStream.close();
> }
> }
>
> @Override
> public boolean reachedEnd() throws IOException {
> return nextEntry == null ||  currentPosition ==
> nextEntry.getSize();
> }
>
> @Override
> public E nextRecord(E reuse) throws IOException {
> if(reachedEnd()) {
> return null;
> }
> logger.info("currentPosition={}", currentPosition);
> int c;
> ByteArrayOutputStream bos = new ByteArrayOutputStream();
> while (currentPosition < nextEntry.getSize()) {
> c = tarArchiveInputStream.read();
> currentPosition++;
> if (c == '\n') {
> break;
> } else {
> bos.write(c);
> }
> }
> return objectMapper.readValue(bos.toByteArray(), valueType);
> }
>
> }
>
> Thanks.
>
> --
> Wayne D. Young
> aka Billy Bob Bain
> billybobb...@gmail.com
>


-- 

Arvid Heise | Senior Java Developer



Follow us @Verv

Re: Getting Exception in thread "main" java.util.concurrent.ExecutionException: scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot initialize the compiler

2021-01-05 Thread Arvid Heise
Hi Avi,

without being a scala-guy, I'm guessing that you are mixing scala versions.
Could you check that your user code uses the same scala version as Flink
(1.11 or 1.12)? I have also heard of issues with different minor versions
of scala, so make sure to use the exact same version (e.g. 2.11.12).

On Mon, Dec 28, 2020 at 3:54 PM Avi Levi  wrote:

> I am trying to aggregate all records in a time window. This is my
> ProcessAllWindowFunction :
>
> case class SimpleAggregate(elms: List[String])
>
> class AggregateLogs extends ProcessAllWindowFunction[String, SimpleAggregate, 
> TimeWindow ] {
>   override def process(context: Context, elements: Iterable[String], out: 
> Collector[SimpleAggregate]): Unit = {
> val es: List[String] = elements.toList
> val record = SimpleAggregate(es)
> out.collect(record)
>   }
> }
>
> But I am getting this exception why ?
>
> Exception in thread "main" java.util.concurrent.ExecutionException:
> scala.tools.reflect.ToolBoxError: reflective compilation has failed: cannot
> initialize the compiler due to java.lang.BootstrapMethodError:
> java.lang.NoSuchMethodError:
> scala.collection.immutable.List.$anonfun$flatMap$1$adapted(Lscala/runtime/BooleanRef;Lscala/runtime/ObjectRef;Lscala/runtime/ObjectRef;Ljava/lang/Object;)Ljava/lang/Object;
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:299)
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:286)
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
> at
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:137)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2348)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2320)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> at
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer$.compileCbf(TraversableSerializer.scala:184)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.compileCbf(TraversableSerializer.scala:51)
> at
> org.apache.flink.api.scala.typeutils.TraversableSerializer.(TraversableSerializer.scala:41)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2$$anon$3.(HandleFinancialJob.scala:52)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2.createSerializer(HandleFinancialJob.scala:52)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1$$anon$2.createSerializer(HandleFinancialJob.scala:52)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1.$anonfun$createSerializer$1(HandleFinancialJob.scala:52)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158)
> at
> com.neosec.handlefinancial.HandleFinancialJob$$anon$1.createSerializer(HandleFinancialJob.scala:52)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.createSerializer(StreamGraph.java:864)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:308)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:293)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:680)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:253)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:212)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1863)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1848)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1699)
> at
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:699)
> at
> com.neosec.handlefinancial.HandleFinancialJob$.delayedEndpoint$com$neosec$handlefinancial$HandleFinancialJob$1(HandleFinancialJob.scala:60)
> at
> com.neosec.handlefinancial.HandleFinancialJob$delayedInit$body.apply(HandleFinancialJob.scala:20)
> at scala.Function0.apply$mcV$sp(Function0.scala:39)
> at scala.Function0.apply$mcV$sp$(Function0.scala:39)
> at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17)
> at scala.App.$an

Re: read a tarred + gzipped file flink 1.12

2021-01-05 Thread Arvid Heise
Hi Billy,

I suspect that it's not possible in Flink as is. The tar file acts as a
directory containing an arbitrary number of files. Afaik, Flink assumes
that all compressed files or just single files, like gz without tar. It's
like this in your case, but then the tar part doesn't make much sense.

Since you cannot control the input, you have two options:
* External process that unpacks the file and then calls Flink.
* Implement your own input format similar to [1].

[1]
https://stackoverflow.com/questions/49122170/zip-compressed-input-for-apache-flink

On Mon, Dec 28, 2020 at 2:41 PM Billy Bain  wrote:

> We have an input file that is tarred and compressed to 12gb. It is about
> 50gb uncompressed.
>
> With readTextFile(), I see it uncompress the file but then flink doesn't
> seem to handle the untar portion. It's just a single file. (We don't
> control the input format)
>
> foo.tar.gz 12gb
> foo.tar  50gb
> then untar it and it is valid jsonl
>
> When reading, we get this exception:
>
> Caused by:
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException:
> Unrecognized token 'playstore': was expecting (JSON String, Number, Array,
> Object or token 'null', 'true' or 'false')
>  at [Source: UNKNOWN; line: 1, column: 10]
> at
> org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
>
> The process is seeing the header in the tar format and rightly complaining
> about the JSON format.
>
> Is it possible to untar this file using Flink?
>
> --
> Wayne D. Young
> aka Billy Bob Bain
> billybobb...@gmail.com
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


[CVE-2020-17518] Apache Flink directory traversal attack: remote file writing through the REST API

2021-01-05 Thread Robert Metzger
CVE-2020-17518: Apache Flink directory traversal attack: remote file
writing through the REST API

Vendor:
The Apache Software Foundation

Versions Affected:
1.5.1 to 1.11.2

Description:
Flink 1.5.1 introduced a REST handler that allows you to write an uploaded
file to an arbitrary location on the local file system, through a
maliciously modified HTTP HEADER. The files can be written to any location
accessible by Flink 1.5.1.

Mitigation:
All users should upgrade to Flink 1.11.3 or 1.12.0 if their Flink
instance(s) are exposed.
The issue was fixed in commit a5264a6f41524afe8ceadf1d8ddc8c80f323ebc4 from
apache/flink:master.

Credits:
This issue was discovered by 0rich1 of Ant Security FG Lab


[CVE-2020-17519] Apache Flink directory traversal attack: reading remote files through the REST API

2021-01-05 Thread Robert Metzger
CVE-2020-17519: Apache Flink directory traversal attack: reading remote
files through the REST API

Vendor:
The Apache Software Foundation

Versions Affected:
1.11.0, 1.11.1, 1.11.2

Description:
A change introduced in Apache Flink 1.11.0 (and released in 1.11.1 and
1.11.2 as well) allows attackers to read any file on the local filesystem
of the JobManager through the REST interface of the JobManager process.
Access is restricted to files accessible by the JobManager process.

Mitigation:
All users should upgrade to Flink 1.11.3 or 1.12.0 if their Flink
instance(s) are exposed.
The issue was fixed in commit b561010b0ee741543c3953306037f00d7a9f0801 from
apache/flink:master.

Credits:
This issue was discovered by 0rich1 of Ant Security FG Lab


Re: Re: checkpoint delay consume message

2021-01-05 Thread Arvid Heise
There seems to be a double-post with the mail "Long latency when consuming
a message from KAFKA and checkpoint is enabled". Let's continue discussion
there.

On Sun, Dec 27, 2020 at 8:33 AM nick toker  wrote:

> Hi,
> Hi,  We think  we are using the default values unless we are missing
> something.
> So this doesn't explain the problem we are facing.
> Could you please tell us how to choose synchronous or asynchronous
> checkpoints just to be sure we are using the correct configuration ?
> BR,
> Nick
>
> ‫בתאריך יום ה׳, 24 בדצמ׳ 2020 ב-3:36 מאת ‪lec ssmi‬‏ <‪
> shicheng31...@gmail.com‬‏>:‬
>
>> Checkpoint can be done synchronously and  asynchronously,  the latter is
>> the default .
>> If you chooese  the synchronous way , it may cause this problem.
>>
>> nick toker  于2020年12月23日周三 下午3:53写道:
>>
>>> Hi Yun,
>>>
>>> Sorry but we didn't understand your questions.
>>> The delay we are experiencing is on the *read* side.
>>> The message is written to kafka topic and consumed by flink with a delay
>>> that depends on the checkpoints interval
>>> When we disabled the checkpoints the messages are immediately consumed
>>> We use the EXACTLY-ONCE semantic.
>>>
>>> Please advise.
>>> BR,
>>> Nick
>>>
>>> ‫בתאריך יום ג׳, 22 בדצמ׳ 2020 ב-9:32 מאת ‪Yun Gao‬‏ <‪
>>> yungao...@aliyun.com‬‏>:‬
>>>
 Hi nick,

Sorry I initially think that the data is also write into Kafka with
 flink . So it could be ensured that there is no delay in the write side,
 right ? Does the delay in the read side keeps existing ?

 Best,
  Yun



 --Original Mail --
 *Sender:*nick toker 
 *Send Date:*Tue Dec 22 01:43:50 2020
 *Recipients:*Yun Gao 
 *CC:*user 
 *Subject:*Re: checkpoint delay consume message

> hi
>
> i am confused
>
> the delay in in the source when reading message not on the sink
>
> nick
>
> ‫בתאריך יום ב׳, 21 בדצמ׳ 2020 ב-18:12 מאת ‪Yun Gao‬‏ <‪
> yungao...@aliyun.com‬‏>:‬
>
>>  Hi Nick,
>>
>> Are you using EXACTLY_ONCE semantics ? If so the sink would use
>> transactions, and only commit the transaction on checkpoint complete to
>> ensure end-to-end exactly-once. A detailed description could be find in 
>> [1]
>>
>>
>> Best,
>>  Yun
>>
>>
>> [1]
>> https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html
>>
>> --
>> Sender:nick toker
>> Date:2020/12/21 23:52:34
>> Recipient:user
>> Theme:checkpoint delay consume message
>>
>> Hello,
>>
>> We noticed the following behavior:
>> If we enable the flink checkpoints, we saw that there is a delay
>> between the time we write a message to the KAFKA topic and the time the
>> flink kafka connector consumes this message.
>> The delay is closely related to checkpointInterval and/or
>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>> message from KAFKA will be one of these parameters.
>>
>> Could you please advise how we can remove/control this delay?
>>
>> we use flink 1.11.2
>>
>> BR
>> nick
>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Long latency when consuming a message from KAFKA and checkpoint is enabled

2021-01-05 Thread Arvid Heise
Hi Nick,

I'm not entirely sure that I understand your setup correctly.

Basically, when enabling exactly once and checkpointing, Flink will only
consume messages that have been committed.
If you chain two Flink jobs with an intermediate Kafka topic, then the
first Flink job will only commit messages on checkpoints and thus the
second Flink job will only read these messages with a delay up to the
checkpoint interval.

Now if your input record is created with a different tool, make sure that
you commit it immediately. Then, Flink should immediately also process that
record. However, note that Flink again writes the record in a transaction.
Thus, if your tests involve you checking for the output, you would need to
configure your reader to read uncommitted data [1].

You can decrease the latency by decreasing the checkpointing interval. If
you have a need for very low latency, you might also check if you really
need exactly once (that's typically not necessary).

[1] https://kafka.apache.org/documentation/#consumerconfigs_isolation.level

On Mon, Dec 28, 2020 at 3:07 AM Danny Chan  wrote:

> Hi, Nick ~
> The behavior is as expected, because Kafka source/sink relies on the
> Checkpoints to complement the exactly-once write semantics, a checkpoint
> snapshot the states on a time point which is used for recovering, the
> current internals for Kafka sink is that it writes to Kafka but only
> commits it when a checkpoint completes.
>
> For your needs, i guess you want a more near-real-time write but still
> keep the exactly once semantics, i'm sorry to tell that there is no other 
> infrastructure
> that we can use for exactly-once semantics except for the checkpoints.
>
> nick toker  于2020年12月27日周日 下午3:12写道:
>
>> Hi
>>
>> any idea?
>> is it a bug?
>>
>>
>> regards'
>> nick
>>
>> ‫בתאריך יום ד׳, 23 בדצמ׳ 2020 ב-11:10 מאת ‪nick toker‬‏ <‪
>> nick.toker@gmail.com‬‏>:‬
>>
>>> Hello
>>>
>>> We noticed the following behavior:
>>> If we enable the flink checkpoints, we saw that there is a delay between
>>> the time we write a message to the KAFKA topic and the time the flink kafka
>>> connector consumes this message.
>>> The delay is closely related to checkpointInterval and/or
>>> minPauseBetweenCheckpoints meening that the MAX delay when consuming a
>>> message from KAFKA will be one of these parameters
>>>
>>> If we disable the checkpoints, the message is immediately consumed
>>> We work with the EXACTLY_ONCE semantic
>>> Please note that we inject only one message
>>>
>>> Could you please advise how we can remove/control this delay?
>>>
>>> Please see the attached code of AbstractFetcher and KafkaFetcher (as a
>>> png file)
>>> (For example emitRecordsWithTimestamps() use a lock on checkpointLock).
>>> Could this explain the behaviour ?
>>>
>>>
>>> BR
>>>
>>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Chaining 2 flink jobs through a KAFKA topic with checkpoint enabled

2021-01-05 Thread Arvid Heise
Hi Daniel,

Flink commits transactions on checkpoints while Kafka Streams/connect
usually commits on record. This is the typical tradeoff between Throughput
and Latency. By decreasing the checkpoint interval in Flink, you can reach
comparable latency to Kafka Streams.

If you have two exactly once jobs, the second job may only read data that
has been committed (not dirty as Chesnay said). If the second job were to
consume data that is uncommitted, it will result in duplicates, in case the
first job fails and rolls back.

You can configure the read behavior with isolation.level. If you want to
implement exactly once behavior, you also need to set that level in your
other Kafka consumers [1]. Also compare what Kafka Streams is setting if
you want to go exactly once [2].

If you really want low latency, please also double-check if you really need
exactly once.

[1] https://kafka.apache.org/documentation/#consumerconfigs_isolation.level
[2]
https://kafka.apache.org/documentation/streams/developer-guide/config-streams.html#processing-guarantee

On Mon, Dec 28, 2020 at 12:22 PM Chesnay Schepler 
wrote:

> I don't particularly know the our Kafka connector, but it sounds like a
> matter of whether a given consumer does dirty reads.
> Flink does not, whereas the other tools you're using do.
>
> On 12/28/2020 7:57 AM, Daniel Peled wrote:
>
> Hello,
>
> We have 2 flink jobs that communicate with each other through a KAFKA
> topic.
> Both jobs use checkpoints with EXACTLY ONCE semantic.
>
> We have seen the following behaviour and we want to make sure and ask if
> this is the expected behaviour or maybe it is a bug.
>
> When the first job produces a message to KAFKA, the message is consumed
>  by the second job with a latency that depends on the *first* job *checkpoint
> interval*.
>
> We are able to read the message using the kafka tool or using another
> kafka consumer, but NOT with a flink kafka consumer that again depends on
> the checkpoint interval of the first job.
>
> How come the consumer of the second job depends on the producer
> transaction commit time of the first job ?
>
> BR,
> Danny
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Throwing Recoverable Exceptions from Tasks

2021-01-05 Thread Arvid Heise
A typical solution to your issue is to use an ELK stack to collect the logs
and define some filters on log events.

If it's specific to input data issues, I also found side-outputs useful to
store invalid data points. Then, you can simply monitor the side topic
(assuming Kafka) and already have the data points at hand to improve your
software/investigate the root cause.

On Mon, Dec 28, 2020 at 12:14 PM Chesnay Schepler 
wrote:

> There is no way to have an exception appear in the REST API without
> restarting the job; that field is exactly defined as the exception causing
> the job to fail.
>
> Using asynchronous by itself is fine, so long as you don't wait for any
> confirmation. In any case you could remedy the issue by writing the alerts
> into a side-output and having a dedicated sink for submitting these alerts.
>
> On 12/28/2020 5:24 AM, Chirag Dewan wrote:
>
> Hi,
>
> I am building an alerting system where based on some input events I need
> to raise an alert from the user defined aggregate function.
>
> My first approach was to use an asynchronous REST API to send alerts
> outside the task slot. But this obviously involves IO from within the task
> and if I understand correctly, should be avoided.
>
> Another way is to use the Job Manager's *exceptions *REST API to pull the
> alerts from the Flink cluster. This however requires me to throw an
> exception from the operator which results in a job failure/restart.
>
> Is there any way I can throw exceptions that appear in the REST API but
> don't restart the job? This might be similar to how some of the internal
> exceptions like checkpointing related appear in the REST output.
>
> I plan on exploring the Queryable state for such use cases as well, but
> the evolving state might be a blocker.
>
> Thanks
> Chirag
>
>
>

-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Some questions about limit push down

2021-01-05 Thread Arvid Heise
This is most likely a bug, could you reiterate a bit how it is invalid?
I'm also CCing Jark since he is one of the SQL experts.

On Mon, Dec 28, 2020 at 10:37 AM Jun Zhang 
wrote:

> when I query hive table by sql, like this `select * from hivetable where
> id = 1 limit 1`,   I found that the limit push down is invalid, is it a bug
> or was it designed like this?
>
> if the sql is  'select * from hivetable  limit 1'  ,it is ok
>
> thanks
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Flink 1.12.0 docker image MISSING

2021-01-05 Thread Chesnay Schepler
This is a known issue: FLINK-20632 



On 1/5/2021 11:27 AM, Alexandru Vasiu wrote:

Hi,

Docker image for flink 1.12 is missing from Docker Hub.

Thank you,
Alex Vasiu

ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This 
message, including any attachments, is intended only for the use of 
the individual(s) to whom it is addressed and may contain information 
that is strictly privileged/confidential. Any other distribution, 
copying or disclosure is strictly prohibited. If you are not the 
intended recipient or have received this message in error, please 
notify the sender immediately by reply email and permanently delete 
this message including any attachments, without reading it or making a 
copy. Contact us . Website 
. 





Re: Realtime Data processing from HBase

2021-01-05 Thread Arvid Heise
Hi Sunitha,

The current HBase connector only works continuously with Table API/SQL. If
you use the input format, it only reads the data once as you have found out.

What you can do is to implement your own source that repeatedly polls data
and uses pagination or filters to poll only new data. You would add the
last read offset to the checkpoint data of your source.

If you are using Flink 1.12, I'd strongly recommend to use the new source
interface [1].

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/sources.html

On Mon, Dec 28, 2020 at 6:43 AM s_penakalap...@yahoo.com <
s_penakalap...@yahoo.com> wrote:

> Thanks Deepak.
>
> Does this mean Streaming from HBase is not possible using current
> Streaming API?
>
> Also request you to shred some light on HBase checkpointing. I referred
> the below URL to implement checkpointing however in the example I see count
> is passed in the SourceFunction ( SourceFunction) Is it possible to
> checkpoint based on the data we read from HBase
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/connectors/twitter/TwitterSource.html
>
> Regards,
> Sunitha.
>
> On Monday, December 28, 2020, 10:51:45 AM GMT+5:30, Deepak Sharma <
> deepakmc...@gmail.com> wrote:
>
>
> I would suggest another approach here.
> 1.Write a job that reads from hbase , checkpoints and pushes the data to
> broker such as Kafka.
> 2.Flink streaming job would be the second job to read for kafka and
> process data.
>
> With the separation of the concern as above , maintaining it would be
> simpler.
>
> Thanks
> Deepak
>
> On Mon, Dec 28, 2020 at 10:42 AM s_penakalap...@yahoo.com <
> s_penakalap...@yahoo.com> wrote:
>
> Hi Team,
>
> Kindly help me with some inputs.. I am using Flink 1.12.
>
> Regards,
> Sunitha.
>
> On Thursday, December 24, 2020, 08:34:00 PM GMT+5:30,
> s_penakalap...@yahoo.com  wrote:
>
>
> Hi Team,
>
> I recently encountered one usecase in my project as described below:
>
> My data source is HBase
> We receive huge volume of data at very high speed to HBase tables from
> source system.
> Need to read from HBase, perform computation and insert to postgreSQL.
>
> I would like few inputs on the below points:
>
>- Using Flink streaming API,  is continuous streaming possible from
>HBase Database? As I tried using RichSourceFunction 
> ,StreamExecutionEnvironment
>and was able to read data but Job stops once all data is read from HBase.
>My requirement is Job should be continuously executing and read data as and
>when data arrives to HBase table.
>- If continuous streaming from HBase is supported, How can
>Checkpointing be done on HBase so that Job can be restarted from the
>pointed where Job aborted. I tried googling but no luck. Request to help
>with any simple example or approach.
>- If continuous streaming from HBase is not supported then what should
>be alternative approach - Batch Job?(Our requirement is to process the
>realtime data from HBase and not to launch multiple ETL Job)
>
>
> Happy Christmas to all  :)
>
>
> Regards,
> Sunitha.
>
>
>
> --
> Thanks
> Deepak
> www.bigdatabig.com
> www.keosha.net
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Flink 1.12.0 docker image MISSING

2021-01-05 Thread Alexandru Vasiu
Hi,

Docker image for flink 1.12 is missing from Docker Hub.

Thank you,
Alex Vasiu

-- 
ComplyAdvantage is a trading name of IVXS TECHNOLOGY ROMANIA. This message, 
including any attachments, is intended only for the use of the 
individual(s) to whom it is addressed and may contain information that is 
strictly privileged/confidential. Any other distribution, copying or 
disclosure is strictly prohibited. If you are not the intended recipient or 
have received this message in error, please notify the sender immediately 
by reply email and permanently delete this message including any 
attachments, without reading it or making a copy. Contact us 
. Website 
.


Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished

2021-01-05 Thread Arvid Heise
Hi Yun,

1. I'd think that this is an orthogonal issue, which I'd solve separately.
My gut feeling says that this is something we should only address for new
sinks where we decouple the semantics of commits and checkpoints
anyways. @Aljoscha
Krettek  any idea on this one?

2. I'm not sure I get it completely. Let's assume we have a source
partition that is finished before the first checkpoint. Then, we would need
to store the finished state of the subtask somehow. So I'm assuming, we
still need to trigger some checkpointing code on finished subtasks.

3. Do we really want to store the finished flag in OperatorState? I was
assuming we want to have it more fine-grained on OperatorSubtaskState.
Maybe we can store the flag inside managed or raw state without changing
the format?



On Fri, Dec 25, 2020 at 8:39 AM Yun Gao  wrote:

>Hi all,
>
>  I tested the previous PoC with the current tests and I found some
> new issues that might cause divergence, and sorry for there might also be
> some reversal for some previous problems:
>
>
>  1. Which operators should wait for one more checkpoint before close ?
>
> One motivation for this FLIP is to ensure the 2PC sink commits the
> last part of data before closed, which makes the sink operator need to wait
> for one more checkpoint like onEndOfInput() -> waitForCheckpoint() ->
> notifyCheckpointComplete() -> close(). This lead to the issue which
> operators should wait for checkpoint? Possible options are
>  a. Make all the operators (or UDF) implemented
> notifyCheckpointCompleted method wait for one more checkpoint. One
> exception is that since we can only snapshot one or all tasks for a legacy
> source operator to avoid data repetition[1], we could not support legacy
> operators and its chained operators to wait for checkpoints since there
> will be deadlock if part of the tasks are finished, this would finally be
> solved after legacy source are deprecated. The PoC used this option for now.
> b. Make operators (or UDF) implemented a special marker
> interface to wait for one more checkpoint.
>
>
>2. Do we need to solve the case that tasks finished before triggered ?
>
>   Previously I think we could postpone it, however, during testing I
> found that it might cause some problems since by default checkpoint failure
> would cause job failover, and the job would also need wait for another
> interval to trigger the next checkpoint. To pass the tests, I updated the
> PoC to include this part, and we may have a double think on if we need to
> include it or use some other options.
>
> 3. How to extend a new format for checkpoint meta ?
>
> Sorry previously I gave a wrong estimation, after I extract a
> sub-component for (de)serialize operator state, I found the problem just
> goes to the new OperatorStateSerializer. The problem seems to be that v2,
> v3 and v4 have different fields, thus they use different process when
> (de)serialize, which is a bit different from the case that we have a fixed
> steps and each step has different logic. Thus we might either
>  a. Use base classes for each two version.
>  b. Or have a unified framework contains all the possible fields
> across all version, and use empty field serializer to skip some fields in
> each version.
>
> Best,
> Yun
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished#FLIP147:SupportCheckpointsAfterTasksFinished-Option3.Allowtaskstofinish&Checkpointsdonotcontainthefinalstatesfromfinishedtasks
>
> --
> From:Yun Gao 
> Send Time:2020 Dec. 16 (Wed.) 11:07
> To:Aljoscha Krettek ; dev ;
> user 
> Subject:Re: [DISCUSS] FLIP-147: Support Checkpoints After Tasks Finished
>
>  Hi Aljoscha,
>
> Very thanks for the feedbacks! For the remaining issues:
>
>
>   > 1. You mean we would insert "artificial" barriers for barrier 2 in 
> case we receive  EndOfPartition while other inputs have already received 
> barrier 2? I think that makes sense, yes.
>
>
>   Yes, exactly, I would like to  insert "artificial" barriers for in case 
> we receive  EndOfPartition while other inputs have already received barrier 
> 2, and also for the similar cases that some input channels received 
> EndOfPartition during checkpoint 2 is ongoing and when the task receive 
> directly checkpoint triggering after all the precedent tasks are finished but 
> not received their EndOfPartition yet.
>
>
>  > 3. This indeed seems complex. Maybe we could switch to using 
> composition instead of inheritance to make this more extensible?
>
>
> I re-checked the code and now I think composition would be better to 
> avoid complex inheritance hierarchy by exposing the changed part 
> `(de)serializeOperatorState` out, and I'll update the PoC to change this 
> part. Very thanks for the suggestions!
>
>
>> 4. Don't we currently have the sam

Re: Flink+DDL读取kafka没有输出信息,但是kafka消费端有信息

2021-01-05 Thread Arvid Heise
Note that you posted to the english speaking mailing list. For the
Chinese-speaking version please use user...@flink.apache.org.

On Thu, Dec 24, 2020 at 3:39 PM Appleyuchi  wrote:

> 是Flink1.12的,kafka消费端能读取到数据,但是下面的代码无法读取到数据,运行后没有报错也没有输出,
> 求助,谢谢
>
>
> import org.apache.flink.streaming.api.scala._
> import org.apache.flink.table.api.{EnvironmentSettings, Table}
> import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
> import org.apache.flink.types.Row
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
>
> import scala.math.Ordering.Int
>
>
>
> object FlinkKafkaDDLDemo
> {
>
>   def main(args: Array[String]): Unit =
>   {
>
> val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
>
> env.setParallelism(3)
>
>
>
> val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val bsSettings = EnvironmentSettings.newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
> val tEnv = StreamTableEnvironment.create(bsEnv, bsSettings)
>
> val createTable =
>   """
>   |CREATE TABLE PERSON (
>
>   |name VARCHAR COMMENT '姓名',
>
>   |age VARCHAR COMMENT '年龄',
>
>   |city VARCHAR COMMENT '所在城市',
>
>   |address VARCHAR COMMENT '家庭住址',
>
>   |ts TIMESTAMP(3) COMMENT '时间戳'
>
>   |)
>
>   |WITH (
>
>   |'connector.type' = 'kafka', -- 使用 kafka connector
>
>   |'connector.version' = 'universal',  -- kafka 版本
>
>   |'connector.topic' = 'kafka_ddl',  -- kafka topic
>
>   |'connector.startup-mode' = 'earliest-offset', -- 从最早的 
> offset 开始读取
>
>   |'connector.properties.0.key' = 'zookeeper.connect',  -- 
> 连接信息
>
>   |'connector.properties.0.value' = 'Desktop:2181',
>
>   |'connector.properties.1.key' = 'bootstrap.servers',
>
>   |'connector.properties.1.value' = 'Desktop:9091',
>
>   |'update-mode' = 'append',
>
>   |'format.type' = 'json',  -- 数据源格式为 json
>
>   |'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 
> 解析规则
>
>   |)
>
> """.stripMargin
>
>
>
> tEnv.executeSql(createTable)
>
>
>
> val query: String ="""SELECT name,COUNT(age) FROM PERSON GROUP BY 
> name""".stripMargin
>
>
>
> val result: Table = tEnv.sqlQuery(query)
>
> tEnv.toRetractStream[Row](result).print()
> //tEnv.execute("Flink SQL DDL")
>
>   }
>
> }
>
>
>
>
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: using DefaultScalaModule

2021-01-05 Thread Arvid Heise
Hi Debasish,

The idea of shading is actually to hide the dependencies of Flink from the
user, such that he can use his own dependencies with appropriate versions.

That means, you add jackson with jackson-module-scala into your application
jar without worrying about Flink's jackson at all (just treat it as an
implementation detail).

On Tue, Dec 22, 2020 at 12:53 PM Debasish Ghosh 
wrote:

> Hello -
>
> Flink 1.11 uses a shaded version of Jackson for serialization, which does
> not support jackson-module-scala. I need to register DefaultScalaModule for
> some Scala object serialization through Jackson. But when I do a
> mapper.registerModule(DefaultScalaModule), I get the following compilation
> error ..
>
> type mismatch;
> found   : com.fasterxml.jackson.module.scala.DefaultScalaModule.type
> required:
> org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.databind.Module
>
> Any way how I can fix this ?
>
> regards.
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng