Problem starting taskexecutor daemons in 3 node cluster

2019-09-10 Thread Komal Mariam
I'm trying to set up a 3 node Flink cluster (version 1.9) on the following
machines:

Node 1 (Master) : 4 GB (3.8 GB) Core2 Duo 2.80GHz,  Ubuntu 16.04 LTS
Node 2 (Slave) : 16 GB, Core i7-3.40GHz, Ubuntu 16.04 LTS
Node 3 (Slave) : 16 GB, Core i7-3,40GHz, Ubuntu 16.04 LTS

I have followed the instructions on:
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/cluster_setup.html

I have defined the IP/address of "jobmanager.rpc.address" in
conf/flink-conf.yaml in the follwoing format: master@master-node1-hostname

Slaves as conf/slaves:  slave@slave-node2-hostname
slave@slave-node3-hostname
master@master-node1-hostname (using master machine
for task execution too)


However my problem is when running bin/start-cluster.sh on Master node, it
fails to start taskexecutor daemon on* both Slave nodes.* It only starts
both taskexecutor daemon and standalonesession daemon on
master@master-node1-hostname (Node 1)

I have tried both passwordless ssh and password ssh on all machines but the
result is the same. In the latter case, it does ask for
slave@slave-node2-hostname, slave@slave-node3-hostname passowords but fails
to display any message like "starting taskexecutor daemon on " after
that.

I switched my master node to Node 2 and set Node 1 to slave. It was able to
start taskexecutor daemons on* both Node 2 and Node 3 *successfully but did
nothing for Node 1.

I'd appreciate if you can advice on what the problem here could be and how
I can resolve it.

Best Regards,
Komal


Re: Checkpointing is not performing well

2019-09-10 Thread Vijay Bhaskar
You crossed  the upper limits of the check point system of Flink a way
high. Try to distribute events equally over time by adding some sort of
controlled back pressure after receiving data from kinesis streams.
Otherwise the spike coming during 5 seconds time would always create
problems. Tomorrow it may double so best solution in your case is to
deliver at configurable constant rate after receiving messages from kinesis
streams. Otherwise i am sure its always the problem whatever the kind of
streaming engine you use. Tune your configuration to get the optimal rate
so that flink checkpoint state is healthier.

Regards
Bhaskar

On Tue, Sep 10, 2019 at 11:16 PM Ravi Bhushan Ratnakar <
ravibhushanratna...@gmail.com> wrote:

> @Rohan - I am streaming data to kafka sink after applying business logic.
> For checkpoint, I am using s3 as a distributed file system. For local
> recovery, I am using Optimized iops ebs volume.
>
> @Vijay - I forget to mention that incoming data volume is ~ 10 to 21GB per
> minute compressed(lz4) avro message. Generally 90% correlated events come
> within 5 seconds and 10% of the correlated events get extended to 65
> minute. Due to this business requirement, the state size keep growing till
> 65 minutes, after that the state size becomes more or less stable. As the
> state size is growing and is around 350gb at peak load, checkpoint is not
> able to complete within 1 minutes. I want to check as quick as possible
> like every 5 second.
>
> Thanks,
> Ravi
>
>
> On Tue 10 Sep, 2019, 11:37 Vijay Bhaskar, 
> wrote:
>
>> For me task count seems to be huge in number with the mentioned resource
>> count. To rule out the possibility of issue with state backend can you
>> start writing sink data as  , i.e., data ignore sink. And try
>> whether you could run it for longer duration without any issue. You can
>> start decreasing the task manager count until you find descent count of it
>> without having any side effects. Use that value as task manager count and
>> then start adding your state backend. First you can try with Rocks DB. With
>> reduced task manager count you might get good results.
>>
>> Regards
>> Bhaskar
>>
>> On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa <
>> rohan.thimma...@gmail.com> wrote:
>>
>>> Ravi, have you looked at the io operation(iops) rate of the disk? You
>>> can monitoring the iops performance and tune it accordingly with your work
>>> load. This helped us in our project when we hit the wall tuning prototype
>>> much all the parameters.
>>>
>>> Rohan
>>>
>>>
>>> --
>>> *From:* Ravi Bhushan Ratnakar 
>>> *Sent:* Saturday, September 7, 2019 5:38 PM
>>> *To:* Rafi Aroch
>>> *Cc:* user
>>> *Subject:* Re: Checkpointing is not performing well
>>>
>>> Hi Rafi,
>>>
>>> Thank you for your quick response.
>>>
>>> I have tested with rocksdb state backend. Rocksdb required significantly
>>> more taskmanager to perform as compare to filesystem state backend. The
>>> problem here is that checkpoint process is not fast enough to complete.
>>>
>>> Our requirement is to do checkout as soon as possible like in 5 seconds
>>> to flush the output to output sink. As the incoming data rate is high, it
>>> is not able to complete quickly. If I increase the checkpoint duration, the
>>> state size grows much faster and hence takes much longer time to complete
>>> checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve
>>> much. Adding more taskmanager to increase parallelism also does not improve
>>> the checkpointing performance.
>>>
>>> Is it possible to achieve checkpointing as short as 5 seconds with such
>>> high input volume?
>>>
>>> Regards,
>>> Ravi
>>>
>>> On Sat 7 Sep, 2019, 22:25 Rafi Aroch,  wrote:
>>>
 Hi Ravi,

 Consider moving to RocksDB state backend, where you can enable
 incremental checkpointing. This will make you checkpoints size stay pretty
 much constant even when your state becomes larger.


 https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend


 Thanks,
 Rafi

 On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
 ravibhushanratna...@gmail.com> wrote:

> Hi All,
>
> I am writing a streaming application using Flink 1.9. This application
> consumes data from kinesis stream which is basically avro payload.
> Application is using KeyedProcessFunction to execute business logic on the
> basis of correlation id using event time characteristics with below
> configuration --
> StateBackend - filesystem with S3 storage
> registerTimeTimer duration for each key is  -  currentWatermark  + 15
> seconds
> checkpoint interval - 1min
> minPauseBetweenCheckpointInterval - 1 min
> checkpoint timeout - 10mins
>
> incoming data rate from kinesis -  ~10 to 21GB/min
>
> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>
> First 2-4 

Re: Flink1.9 sql 提交失败

2019-09-10 Thread 越张
是的,是这个问题,发现包打在胖包里面了,但是找不到,把包放在flink lib 目录下就好了,很奇怪

> 在 2019年9月11日,上午9:35,Dian Fu  写道:
> 
> 看你的报错,Kafka010TableSourceSinkFactory不在classpath里,需要把kafka 
> connector的jar(0.10需要依赖flink-connector-kafka-0.10_2.11或者flink-connector-kafka-0.10_2.12)放到依赖里。
> 
> 
>> 在 2019年9月10日,下午12:31,越张  写道:
>> 
>> 代码:
>> EnvironmentSettings bsSettings = 
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamExecutionEnvironment streamEnv = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> TableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
>> bsSettings);
>> 
>> tableEnv.connect(new Kafka()
>>   .version("0.10")
>>   .topic("installmentdb_t_user")
>>   .startFromEarliest()
>>   .property("zookeeper.connect", "risk-kafka.aku:2181")
>>   .property("bootstrap.servers", "risk-kafka.aku:9092"))
>>   .withFormat(new Json().deriveSchema())
>>   .withSchema(new Schema()
>>   .field("business", Types.STRING)
>>   .field("type", Types.STRING)
>>   .field("es", Types.LONG)
>>   )
>>   .inAppendMode().registerTableSource("installmentdb_t_user");
>> 
>> 
>> 
>> 
>> Starting execution of program
>> 
>> 
>> The program finished with the following exception:
>> 
>> org.apache.flink.client.program.ProgramInvocationException: The main method 
>> caused an error: findAndCreateTableSource failed.
>>  at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>  at 
>> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
>>  at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
>>  at 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
>>  at java.security.AccessController.doPrivileged(Native Method)
>>  at javax.security.auth.Subject.doAs(Subject.java:422)
>>  at 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1917)
>>  at 
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>  at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
>> Caused by: org.apache.flink.table.api.TableException: 
>> findAndCreateTableSource failed.
>>  at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
>>  at 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
>>  at 
>> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
>>  at feature.flinktask.sqltest.main(sqltest.java:39)
>>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>  at 
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>  at 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  at java.lang.reflect.Method.invoke(Method.java:498)
>>  at 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>  ... 12 more
>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
>> not find a suitable table factory for 
>> 'org.apache.flink.table.factories.TableSourceFactory' in
>> the classpath.
>> 
>> Reason: No context matches.
>> 
>> The following properties are requested:
>> connector.properties.0.key=zookeeper.connect
>> connector.properties.0.value=risk-kafka.aku:2181
>> connector.properties.1.key=bootstrap.servers
>> connector.properties.1.value=risk-kafka.aku:9092
>> connector.property-version=1
>> connector.startup-mode=earliest-offset
>> connector.topic=installmentdb_t_user
>> connector.type=kafka
>> connector.version=0.10
>> format.derive-schema=true
>> format.property-version=1
>> format.type=json
>> schema.0.name=business
>> schema.0.type=VARCHAR
>> schema.1.name=type
>> schema.1.type=VARCHAR
>> schema.2.name=es
>> schema.2.type=BIGINT
>> update-mode=append
>> 
>> The following factories have been considered:
>> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
>> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
>> org.apache.flink.table.planner.StreamPlannerFactory
>> org.apache.flink.table.executor.StreamExecutorFactory
>> 

error: Static methods in interface require -target:jvm-1.8 using scala 2.11

2019-09-10 Thread Ben Yan
The following is the environment I use:
1. flink.version: 1.9.0
2. java version "1.8.0_212"
3. scala version: 2.11.12

When I wrote the following code in the scala programming language, I found
the following error:

// set up the batch execution environment
val bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)

error: Static methods in interface require -target:jvm-1.8
[ERROR] val bbTableEnv = TableEnvironment.create(bbSettings)

But when I use the java programming language or the version of scala
in 2.12, there is no problem.

If I use the version of scala2.11, is there any way to solve this
problem? thanks


Best,

Ben


[no subject]

2019-09-10 Thread Ben Yan
The following is the environment I use:
1. flink.version: 1.9.0
2. java version "1.8.0_212"
3. scala version: 2.11.12

When I wrote the following code in the scala programming language, I found
the following error:

// set up the batch execution environment
val bbSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build()
val bbTableEnv = TableEnvironment.create(bbSettings)

error: Static methods in interface require -target:jvm-1.8
[ERROR] val bbTableEnv = TableEnvironment.create(bbSettings)

But when I use the java programming language or the version of scala
in 2.12, there is no problem.

If I use the version of scala2.11, is there any way to solve this
problem? thanks


Best,

Ben


flink sql中怎么表达窗口的提前触发或延迟触发

2019-09-10 Thread 苏 欣
Blink文档中有介绍到EMIT Strategy,可以用WITH DELAY '1' MINUTE BEFORE WATERMARK或者EMIT 
WITHOUT DELAY AFTER WATERMARK等类似的语法来控制窗口触发。
但是我使用这种语法作业运行就会报SQL解析错误,请问有没有办法可以在sql中实现控制窗口触发的操作?
Table result = tEnv.sqlQuery("select " +
"count(*) " +
"from dept group by tumble(crt_time, INTERVAL '10' SECOND) WITH 
DELAY '1' MINUTE BEFORE WATERMARK");
报错:
Exception in thread "main" org.apache.flink.table.api.SqlParserException:

ERR_ID:
 SQL-00120001
CAUSE:
 SQL parse failed:
 Encountered "WITH" at line 1, column 75.
 Was expecting one of:
 
 "ORDER" ...
 "LIMIT" ...
 "OFFSET" ...
 "FETCH" ...
 "," ...

发送自 Windows 10 版邮件应用



Filter events based on future events

2019-09-10 Thread theo.diefent...@scoop-software.de
Hi there, I have the following use case:I get transaction logs from multiple servers. Each server puts its logs into its own Kafka partition so that within each partition the elements are monothonically ordered by time. Within the stream of transactions, we have some special events. Let's call them A. (roughly 1-10% in distribution have this type). An A event can have an Anti-A event later on in time. That is an event which has all the same attributes (like username, faculty,..) but differs in one boolean attribute indicating that it is an anti event. Kind of a retraction. Now I want to emit almost all events downstream (including neither A nor Anti-A, let's call them simpy B), preserving the monothonical order of events. There is just one special case in which I want to filter out an element: If the stream has an A event followed by an Anti-A event within one minute time, only the Anti-A event shall go downstream, not A itself. But if there is no Anti-A event, A shall be emitted and shall still be within timestamp order of events. I'm wrangling my head around it a lot and don't come up with a proper (performant) solution. It seems to be obvious that in the end, I need to buffer all records over 1 minute so that order can be preserved. But I have no idea how to implement this in Flink efficiently. My thoughts thus far:1. I could give CEP a try. But in that CEP I would need to write something like match all B events in any case. And match A also but only if there is no anti A => doesn`t that produce a lot of state? And are all B events considered in the breadth first rule match approach, I. E. Tons of unnecessary comparisons against A? Any pseudo code on how I could do this with CEP? 2. If I key data by partition and all other attributes except for the retract boolean so that A and anti A always fall into the same keyed stream but no other event in that stream, I probably get much better comparison capabilities. But how much overhead do I produce with it? Will Flink reshuffle the data even if the first key stays the same? And can I backpartiton to my "global" per partition order? Note that some events have the exact event time timestamp but I still want to have them in their original order later on. 3. Could I work with session windows somehow? Putting A and Anti A in the same session and in window emit I would just not collect the A event if there is an Anti A? Would it be more or less overhead compared to CEP?4. Do you have any other idea on how to approach this? Sadly, I have no way to manipulate the input stream, so that part of the pipeline is fixed.Best regardsTheo

Re: Checkpointing is not performing well

2019-09-10 Thread Ravi Bhushan Ratnakar
@Rohan - I am streaming data to kafka sink after applying business logic.
For checkpoint, I am using s3 as a distributed file system. For local
recovery, I am using Optimized iops ebs volume.

@Vijay - I forget to mention that incoming data volume is ~ 10 to 21GB per
minute compressed(lz4) avro message. Generally 90% correlated events come
within 5 seconds and 10% of the correlated events get extended to 65
minute. Due to this business requirement, the state size keep growing till
65 minutes, after that the state size becomes more or less stable. As the
state size is growing and is around 350gb at peak load, checkpoint is not
able to complete within 1 minutes. I want to check as quick as possible
like every 5 second.

Thanks,
Ravi


On Tue 10 Sep, 2019, 11:37 Vijay Bhaskar,  wrote:

> For me task count seems to be huge in number with the mentioned resource
> count. To rule out the possibility of issue with state backend can you
> start writing sink data as  , i.e., data ignore sink. And try
> whether you could run it for longer duration without any issue. You can
> start decreasing the task manager count until you find descent count of it
> without having any side effects. Use that value as task manager count and
> then start adding your state backend. First you can try with Rocks DB. With
> reduced task manager count you might get good results.
>
> Regards
> Bhaskar
>
> On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa 
> wrote:
>
>> Ravi, have you looked at the io operation(iops) rate of the disk? You can
>> monitoring the iops performance and tune it accordingly with your work
>> load. This helped us in our project when we hit the wall tuning prototype
>> much all the parameters.
>>
>> Rohan
>>
>>
>> --
>> *From:* Ravi Bhushan Ratnakar 
>> *Sent:* Saturday, September 7, 2019 5:38 PM
>> *To:* Rafi Aroch
>> *Cc:* user
>> *Subject:* Re: Checkpointing is not performing well
>>
>> Hi Rafi,
>>
>> Thank you for your quick response.
>>
>> I have tested with rocksdb state backend. Rocksdb required significantly
>> more taskmanager to perform as compare to filesystem state backend. The
>> problem here is that checkpoint process is not fast enough to complete.
>>
>> Our requirement is to do checkout as soon as possible like in 5 seconds
>> to flush the output to output sink. As the incoming data rate is high, it
>> is not able to complete quickly. If I increase the checkpoint duration, the
>> state size grows much faster and hence takes much longer time to complete
>> checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve
>> much. Adding more taskmanager to increase parallelism also does not improve
>> the checkpointing performance.
>>
>> Is it possible to achieve checkpointing as short as 5 seconds with such
>> high input volume?
>>
>> Regards,
>> Ravi
>>
>> On Sat 7 Sep, 2019, 22:25 Rafi Aroch,  wrote:
>>
>>> Hi Ravi,
>>>
>>> Consider moving to RocksDB state backend, where you can enable
>>> incremental checkpointing. This will make you checkpoints size stay pretty
>>> much constant even when your state becomes larger.
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend
>>>
>>>
>>> Thanks,
>>> Rafi
>>>
>>> On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
>>> ravibhushanratna...@gmail.com> wrote:
>>>
 Hi All,

 I am writing a streaming application using Flink 1.9. This application
 consumes data from kinesis stream which is basically avro payload.
 Application is using KeyedProcessFunction to execute business logic on the
 basis of correlation id using event time characteristics with below
 configuration --
 StateBackend - filesystem with S3 storage
 registerTimeTimer duration for each key is  -  currentWatermark  + 15
 seconds
 checkpoint interval - 1min
 minPauseBetweenCheckpointInterval - 1 min
 checkpoint timeout - 10mins

 incoming data rate from kinesis -  ~10 to 21GB/min

 Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)

 First 2-4 checkpoints get completed within 1mins where the state size
 is usually 50GB. As the state size grows beyond 50GB, then checkpointing
 time starts taking more than 1mins and it increased till 10 mins and then
 checkpoint fails. The moment the checkpoint starts taking more than 1 mins
 to complete then application starts processing slow and start lagging in
 output.

 Any suggestion to fine tune checkpoint performance would be highly
 appreciated.

 Regards,
 Ravi

>>>


Re: Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

2019-09-10 Thread Fabian Hueske
Hi,

that would be regular SQL cast syntax:

SELECT a, b, c, CAST(eventTime AS TIMESTAMP) FROM ...


Am Di., 10. Sept. 2019 um 18:07 Uhr schrieb Niels Basjes :

> Hi.
>
> Can you give me an example of the actual syntax of such a cast?
>
> On Tue, 10 Sep 2019, 16:30 Fabian Hueske,  wrote:
>
>> Hi Niels,
>>
>> I think (not 100% sure) you could also cast the event time attribute to
>> TIMESTAMP before you emit the table.
>> This should remove the event time property (and thereby the
>> TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output
>> types.
>>
>> Best, Fabian
>>
>> Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes > >:
>>
>>> Hi,
>>>
>>> It has taken me quite a bit of time to figure this out.
>>> This is the solution I have now (works on my machine).
>>>
>>> Please tell me where I can improve this.
>>>
>>> Turns out that the schema you provide for registerDataStream only needs
>>> the 'top level' fields of the Avro datastructure.
>>> With only the top fields there you can still access nested fields with
>>> something like "topfield.x.y.z" in the SQL statement.
>>>
>>> What I found is that the easiest way to make this all work is to ensure
>>> the rowtime field in the structure is at the top level (which makes sense
>>> in general) and generate the fields string where I only need to know the
>>> name of the "rowtime" field.
>>>
>>> So I have
>>>
>>> DataStream inputStream = ...
>>>
>>>
>>> then I register the stream with
>>>
>>>
>>> TypeInformation typeInformation = 
>>> TypeInformation.of(Measurement.class);
>>> String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
>>>
>>> List rootSchema = new ArrayList<>();
>>> for (String fieldName: fieldNames) {
>>> if (rowtimeFieldName.equals(fieldName)) {
>>> rootSchema.add(fieldName + ".rowtime");
>>> } else {
>>> rootSchema.add(fieldName);
>>> }
>>> }
>>>
>>> tableEnv.registerDataStream("MeasurementStream", inputStream, 
>>> String.join(",", rootSchema));
>>>
>>>
>>> Now after the actual SQL has been executed I have a
>>>
>>> Table resultTable = ...
>>>
>>> Now simply feeding this into a DataStream with something like this fails
>>> badly.
>>>
>>> TypeInformation tupleType = new 
>>> RowTypeInfo(resultTable.getSchema().getFieldTypes());
>>> DataStream  resultSet = tableEnv.toAppendStream(resultTable, 
>>> tupleType);
>>>
>>> will result in
>>>
>>> org.apache.flink.table.api.TableException: The time indicator type is 
>>> an internal type only.
>>>at 
>>> org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
>>>
>>> Turns out that the schema of the output contains a field that was
>>> created by TUMBLE_START which is of type TimeIndicatorTypeInfo
>>>
>>> So I have to do it this way (NASTY!):
>>>
>>> final TypeInformation[] fieldTypes = 
>>> resultTable.getSchema().getFieldTypes();
>>> int index;
>>> for(index = 0 ; index < fieldTypes.length ; index++) {
>>> if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
>>>fieldTypes[index] = SQL_TIMESTAMP;
>>> }
>>> }
>>> TypeInformation tupleType = new RowTypeInfo(fieldTypes);
>>> DataStream  resultSet = tableEnv.toAppendStream(resultTable, 
>>> tupleType);
>>>
>>> Which gives me the desired DataStream.
>>>
>>>
>>> Niels Basjes
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Aug 14, 2019 at 5:13 PM Timo Walther  wrote:
>>>
 Hi Niels,

 if you are coming from DataStream API, all you need to do is to write a
 timestamp extractor.

 When you call:

 tableEnv.registerDataStream("TestStream", letterStream,
 "EventTime.rowtime, letter, counter");

 The ".rowtime" means that the framework will extract the rowtime from
 the stream record timestamp. You don't need to name all fields again but
 could simply construct a string from
 letterStream.getTypeInfo().getFieldNames(). I hope we can improve this
 further in the future as part of FLIP-37.

 Regards,
 Timo

 Am 14.08.19 um 17:00 schrieb Niels Basjes:

 Hi,

 Experimenting with the StreamTableEnvironment I build something like
 this:

 DataStream> letterStream = ...
 tableEnv.registerDataStream("TestStream", letterStream,
 "EventTime.rowtime, letter, counter");


 Because the "EventTime" was tagged with ".rowtime" it is now being used
 as the rowtime and has the DATETIME so I can do this

 TUMBLE_START(eventTime, INTERVAL '1' MINUTE)


 So far so good.

 Working towards a more realistic scenario I have a source that produces
 a stream of records that have been defined using Apache Avro.

 So I have a Measurement.avdl that (among other things) contains
 something like this:

 record Measurement {
/** The time (epoch in 

Re: Error While Building Flink From Source

2019-09-10 Thread Yuval Itzchakov
Never mind, turns out it was an error on my part. Somehow I managed do add
an "S" to an attribute mistakenly :)

On Tue, Sep 10, 2019 at 7:29 PM Yuval Itzchakov  wrote:

> Still getting the same error message using your command. Which Maven
> version are you using?
>
> On Tue, Sep 10, 2019 at 6:39 PM Debasish Ghosh 
> wrote:
>
>> I could build using the following command ..
>>
>> mvn clean install -Dcheckstyle.skip -DskipTests -Dscala-2.12
>> -Drat.skip=true
>>
>> regards.
>>
>> On Tue, Sep 10, 2019 at 9:06 PM Yuval Itzchakov 
>> wrote:
>>
>>> Hi,
>>> I'm trying to build Flink from source. I'm using Maven 3.6.1 and
>>> executing the following command:
>>>
>>> mvn clean install -DskipTests -Dfast -Dscala-2.12
>>>
>>> Running both on the master branch and the release-1.9.0 tag, I get the
>>> following error:
>>>
>>> [ERROR] Failed to execute goal
>>> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile
>>> (default-compile) on project flink-streaming-java_2.12: Compilation failure
>>> [ERROR]
>>> /Users/yuval.itzchakov/oss/flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java:[53,10]
>>> cannot find symbol
>>> [ERROR]   symbol:   class Overrides
>>> [ERROR]   location: class
>>> org.apache.flink.streaming.api.operators.StreamSink
>>>
>>> Has anyone run into this problem?
>>>
>>> --
>>> Best Regards,
>>> Yuval Itzchakov.
>>>
>>
>>
>> --
>> Debasish Ghosh
>> http://manning.com/ghosh2
>> http://manning.com/ghosh
>>
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com
>> Code: http://github.com/debasishg
>>
>
>
> --
> Best Regards,
> Yuval Itzchakov.
>


-- 
Best Regards,
Yuval Itzchakov.


Re: Error While Building Flink From Source

2019-09-10 Thread Yuval Itzchakov
Still getting the same error message using your command. Which Maven
version are you using?

On Tue, Sep 10, 2019 at 6:39 PM Debasish Ghosh 
wrote:

> I could build using the following command ..
>
> mvn clean install -Dcheckstyle.skip -DskipTests -Dscala-2.12
> -Drat.skip=true
>
> regards.
>
> On Tue, Sep 10, 2019 at 9:06 PM Yuval Itzchakov  wrote:
>
>> Hi,
>> I'm trying to build Flink from source. I'm using Maven 3.6.1 and
>> executing the following command:
>>
>> mvn clean install -DskipTests -Dfast -Dscala-2.12
>>
>> Running both on the master branch and the release-1.9.0 tag, I get the
>> following error:
>>
>> [ERROR] Failed to execute goal
>> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile
>> (default-compile) on project flink-streaming-java_2.12: Compilation failure
>> [ERROR]
>> /Users/yuval.itzchakov/oss/flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java:[53,10]
>> cannot find symbol
>> [ERROR]   symbol:   class Overrides
>> [ERROR]   location: class
>> org.apache.flink.streaming.api.operators.StreamSink
>>
>> Has anyone run into this problem?
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>
>
> --
> Debasish Ghosh
> http://manning.com/ghosh2
> http://manning.com/ghosh
>
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com
> Code: http://github.com/debasishg
>


-- 
Best Regards,
Yuval Itzchakov.


Re: Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

2019-09-10 Thread Niels Basjes
Hi.

Can you give me an example of the actual syntax of such a cast?

On Tue, 10 Sep 2019, 16:30 Fabian Hueske,  wrote:

> Hi Niels,
>
> I think (not 100% sure) you could also cast the event time attribute to
> TIMESTAMP before you emit the table.
> This should remove the event time property (and thereby the
> TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output
> types.
>
> Best, Fabian
>
> Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes :
>
>> Hi,
>>
>> It has taken me quite a bit of time to figure this out.
>> This is the solution I have now (works on my machine).
>>
>> Please tell me where I can improve this.
>>
>> Turns out that the schema you provide for registerDataStream only needs
>> the 'top level' fields of the Avro datastructure.
>> With only the top fields there you can still access nested fields with
>> something like "topfield.x.y.z" in the SQL statement.
>>
>> What I found is that the easiest way to make this all work is to ensure
>> the rowtime field in the structure is at the top level (which makes sense
>> in general) and generate the fields string where I only need to know the
>> name of the "rowtime" field.
>>
>> So I have
>>
>> DataStream inputStream = ...
>>
>>
>> then I register the stream with
>>
>>
>> TypeInformation typeInformation = 
>> TypeInformation.of(Measurement.class);
>> String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
>>
>> List rootSchema = new ArrayList<>();
>> for (String fieldName: fieldNames) {
>> if (rowtimeFieldName.equals(fieldName)) {
>> rootSchema.add(fieldName + ".rowtime");
>> } else {
>> rootSchema.add(fieldName);
>> }
>> }
>>
>> tableEnv.registerDataStream("MeasurementStream", inputStream, 
>> String.join(",", rootSchema));
>>
>>
>> Now after the actual SQL has been executed I have a
>>
>> Table resultTable = ...
>>
>> Now simply feeding this into a DataStream with something like this fails
>> badly.
>>
>> TypeInformation tupleType = new 
>> RowTypeInfo(resultTable.getSchema().getFieldTypes());
>> DataStream  resultSet = tableEnv.toAppendStream(resultTable, 
>> tupleType);
>>
>> will result in
>>
>> org.apache.flink.table.api.TableException: The time indicator type is an 
>> internal type only.
>>at 
>> org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
>>
>> Turns out that the schema of the output contains a field that was created
>> by TUMBLE_START which is of type TimeIndicatorTypeInfo
>>
>> So I have to do it this way (NASTY!):
>>
>> final TypeInformation[] fieldTypes = 
>> resultTable.getSchema().getFieldTypes();
>> int index;
>> for(index = 0 ; index < fieldTypes.length ; index++) {
>> if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
>>fieldTypes[index] = SQL_TIMESTAMP;
>> }
>> }
>> TypeInformation tupleType = new RowTypeInfo(fieldTypes);
>> DataStream  resultSet = tableEnv.toAppendStream(resultTable, 
>> tupleType);
>>
>> Which gives me the desired DataStream.
>>
>>
>> Niels Basjes
>>
>>
>>
>>
>>
>> On Wed, Aug 14, 2019 at 5:13 PM Timo Walther  wrote:
>>
>>> Hi Niels,
>>>
>>> if you are coming from DataStream API, all you need to do is to write a
>>> timestamp extractor.
>>>
>>> When you call:
>>>
>>> tableEnv.registerDataStream("TestStream", letterStream,
>>> "EventTime.rowtime, letter, counter");
>>>
>>> The ".rowtime" means that the framework will extract the rowtime from
>>> the stream record timestamp. You don't need to name all fields again but
>>> could simply construct a string from
>>> letterStream.getTypeInfo().getFieldNames(). I hope we can improve this
>>> further in the future as part of FLIP-37.
>>>
>>> Regards,
>>> Timo
>>>
>>> Am 14.08.19 um 17:00 schrieb Niels Basjes:
>>>
>>> Hi,
>>>
>>> Experimenting with the StreamTableEnvironment I build something like
>>> this:
>>>
>>> DataStream> letterStream = ...
>>> tableEnv.registerDataStream("TestStream", letterStream,
>>> "EventTime.rowtime, letter, counter");
>>>
>>>
>>> Because the "EventTime" was tagged with ".rowtime" it is now being used
>>> as the rowtime and has the DATETIME so I can do this
>>>
>>> TUMBLE_START(eventTime, INTERVAL '1' MINUTE)
>>>
>>>
>>> So far so good.
>>>
>>> Working towards a more realistic scenario I have a source that produces
>>> a stream of records that have been defined using Apache Avro.
>>>
>>> So I have a Measurement.avdl that (among other things) contains
>>> something like this:
>>>
>>> record Measurement {
>>>/** The time (epoch in milliseconds since 1970-01-01 UTC) when the
>>> event occurred */
>>> longtimestamp;
>>> string  letter;
>>> longpageviews;
>>> }
>>>
>>>
>>> Now because the registerDataStream call can also derive the schema from
>>> the provided data I can 

Re: Error While Building Flink From Source

2019-09-10 Thread Debasish Ghosh
I could build using the following command ..

mvn clean install -Dcheckstyle.skip -DskipTests -Dscala-2.12 -Drat.skip=true

regards.

On Tue, Sep 10, 2019 at 9:06 PM Yuval Itzchakov  wrote:

> Hi,
> I'm trying to build Flink from source. I'm using Maven 3.6.1 and executing
> the following command:
>
> mvn clean install -DskipTests -Dfast -Dscala-2.12
>
> Running both on the master branch and the release-1.9.0 tag, I get the
> following error:
>
> [ERROR] Failed to execute goal
> org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile
> (default-compile) on project flink-streaming-java_2.12: Compilation failure
> [ERROR]
> /Users/yuval.itzchakov/oss/flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java:[53,10]
> cannot find symbol
> [ERROR]   symbol:   class Overrides
> [ERROR]   location: class
> org.apache.flink.streaming.api.operators.StreamSink
>
> Has anyone run into this problem?
>
> --
> Best Regards,
> Yuval Itzchakov.
>


-- 
Debasish Ghosh
http://manning.com/ghosh2
http://manning.com/ghosh

Twttr: @debasishg
Blog: http://debasishg.blogspot.com
Code: http://github.com/debasishg


Error While Building Flink From Source

2019-09-10 Thread Yuval Itzchakov
Hi,
I'm trying to build Flink from source. I'm using Maven 3.6.1 and executing
the following command:

mvn clean install -DskipTests -Dfast -Dscala-2.12

Running both on the master branch and the release-1.9.0 tag, I get the
following error:

[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:compile
(default-compile) on project flink-streaming-java_2.12: Compilation failure
[ERROR]
/Users/yuval.itzchakov/oss/flink/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java:[53,10]
cannot find symbol
[ERROR]   symbol:   class Overrides
[ERROR]   location: class
org.apache.flink.streaming.api.operators.StreamSink

Has anyone run into this problem?

-- 
Best Regards,
Yuval Itzchakov.


Re: Join with slow changing dimensions/ streams

2019-09-10 Thread Fabian Hueske
Hi Hanan,

BroadcastState and CoMap (or CoProcessFunction) have both advantages and
disadvantages.

Broadcast state is better if the broadcasted side is small (only low data
rate).
Its records are replicated to each instance but the other (larger) stream
does not need to be partitioned and stays on the partitions.

The CoMapFunction approach is better if both side are similar in size.
Their records are not replicated but repartitioned and sent over the
network.

This is the common trade-off between broadcast-forward and
repartition-repartition joins that query optimizer of distributed database
systems have to deal with.

Best,
Fabian

Am Do., 5. Sept. 2019 um 13:37 Uhr schrieb Hanan Yehudai <
hanan.yehu...@radcom.com>:

> Thanks Fabian.
>
>
> is there any advantage using broadcast state  VS using just CoMap function
> on 2 connected streams ?
>
>
>
> *From:* Fabian Hueske 
> *Sent:* Thursday, September 5, 2019 12:59 PM
> *To:* Hanan Yehudai 
> *Cc:* flink-u...@apache.org
> *Subject:* Re: Join with slow changing dimensions/ streams
>
>
>
> Hi,
>
>
>
> Flink does not have good support for mixing bounded and unbounded streams
> in its DataStream API yet.
>
> If the dimension table is static (and small enough), I'd use a
> RichMapFunction and load the table in the open() method into the heap.
>
> In this case, you'd probably need to restart the job (can be done with a
> savepoint and restart) to load a new table. You can also use a
> ProcessFunction and register a timer to periodically load a new table.
>
>
>
> If the dimension table is (slowly) changing, you might want to think about
> the broadcast state.
>
> With this setup you can propagate updates by sending them to the
> broadcasted channel.
>
>
>
> I would not use the join operator because it would also buffer the actual
> stream in state.
>
>
>
> Best, Fabian
>
>
>
> Am Mo., 2. Sept. 2019 um 15:38 Uhr schrieb Hanan Yehudai <
> hanan.yehu...@radcom.com>:
>
> I have a very common use case -enriching the stream with  some
> dimension tables.
>
> e.g   the events stream has a SERVER_ID ,  and another files have the
> LOCATION  associated with e SERVER_ID. ( a dimension table  csv file)
>
> in SQL I would  simply join.
> but hen using Flink  stream API ,  as far as I see,  there are several
> option and I wondered which would be optimal.
>
>
>
> 1. Use the JOIN operator,,  from the documentation (
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/joining.html
> 
> )
> this is always has some time aspect  to the join .  unless I use an
> interval join with very large upper bound and associate the dimension
> stream record with  an old timestamp.
>
>
>
> 2. just write a mapper function the gets the NAME from the dimesion
> records – that are preloaded on the mapFunction  loading method.
>
>
>
> 3. use a broadcast state – this way I can also listen to the changes on
> the dimension  tables  and do the actual join in the processElement
> ducntion.
>
>
>
> What soul be the most efficient way to do this from mem and Cpu
> consumption perspective ?
>
>
>
> Or is there another , better way ?
>
>


Re: Flink SQL: How to tag a column as 'rowtime' from a Avro based DataStream?

2019-09-10 Thread Fabian Hueske
Hi Niels,

I think (not 100% sure) you could also cast the event time attribute to
TIMESTAMP before you emit the table.
This should remove the event time property (and thereby the
TimeIndicatorTypeInfo) and you wouldn't know to fiddle with the output
types.

Best, Fabian

Am Mi., 21. Aug. 2019 um 10:51 Uhr schrieb Niels Basjes :

> Hi,
>
> It has taken me quite a bit of time to figure this out.
> This is the solution I have now (works on my machine).
>
> Please tell me where I can improve this.
>
> Turns out that the schema you provide for registerDataStream only needs
> the 'top level' fields of the Avro datastructure.
> With only the top fields there you can still access nested fields with
> something like "topfield.x.y.z" in the SQL statement.
>
> What I found is that the easiest way to make this all work is to ensure
> the rowtime field in the structure is at the top level (which makes sense
> in general) and generate the fields string where I only need to know the
> name of the "rowtime" field.
>
> So I have
>
> DataStream inputStream = ...
>
>
> then I register the stream with
>
>
> TypeInformation typeInformation = 
> TypeInformation.of(Measurement.class);
> String [] fieldNames = TableEnvironment.getFieldNames(typeInformation);
>
> List rootSchema = new ArrayList<>();
> for (String fieldName: fieldNames) {
> if (rowtimeFieldName.equals(fieldName)) {
> rootSchema.add(fieldName + ".rowtime");
> } else {
> rootSchema.add(fieldName);
> }
> }
>
> tableEnv.registerDataStream("MeasurementStream", inputStream, 
> String.join(",", rootSchema));
>
>
> Now after the actual SQL has been executed I have a
>
> Table resultTable = ...
>
> Now simply feeding this into a DataStream with something like this fails
> badly.
>
> TypeInformation tupleType = new 
> RowTypeInfo(resultTable.getSchema().getFieldTypes());
> DataStream  resultSet = tableEnv.toAppendStream(resultTable, 
> tupleType);
>
> will result in
>
> org.apache.flink.table.api.TableException: The time indicator type is an 
> internal type only.
>at 
> org.apache.flink.table.api.TableEnvironment.org$apache$flink$table$api$TableEnvironment$$validateFieldType$1(TableEnvironment.scala:1172)
>
> Turns out that the schema of the output contains a field that was created
> by TUMBLE_START which is of type TimeIndicatorTypeInfo
>
> So I have to do it this way (NASTY!):
>
> final TypeInformation[] fieldTypes = 
> resultTable.getSchema().getFieldTypes();
> int index;
> for(index = 0 ; index < fieldTypes.length ; index++) {
> if (fieldTypes[index] instanceof TimeIndicatorTypeInfo) {
>fieldTypes[index] = SQL_TIMESTAMP;
> }
> }
> TypeInformation tupleType = new RowTypeInfo(fieldTypes);
> DataStream  resultSet = tableEnv.toAppendStream(resultTable, 
> tupleType);
>
> Which gives me the desired DataStream.
>
>
> Niels Basjes
>
>
>
>
>
> On Wed, Aug 14, 2019 at 5:13 PM Timo Walther  wrote:
>
>> Hi Niels,
>>
>> if you are coming from DataStream API, all you need to do is to write a
>> timestamp extractor.
>>
>> When you call:
>>
>> tableEnv.registerDataStream("TestStream", letterStream,
>> "EventTime.rowtime, letter, counter");
>>
>> The ".rowtime" means that the framework will extract the rowtime from the
>> stream record timestamp. You don't need to name all fields again but could
>> simply construct a string from letterStream.getTypeInfo().getFieldNames().
>> I hope we can improve this further in the future as part of FLIP-37.
>>
>> Regards,
>> Timo
>>
>> Am 14.08.19 um 17:00 schrieb Niels Basjes:
>>
>> Hi,
>>
>> Experimenting with the StreamTableEnvironment I build something like this:
>>
>> DataStream> letterStream = ...
>> tableEnv.registerDataStream("TestStream", letterStream,
>> "EventTime.rowtime, letter, counter");
>>
>>
>> Because the "EventTime" was tagged with ".rowtime" it is now being used
>> as the rowtime and has the DATETIME so I can do this
>>
>> TUMBLE_START(eventTime, INTERVAL '1' MINUTE)
>>
>>
>> So far so good.
>>
>> Working towards a more realistic scenario I have a source that produces a
>> stream of records that have been defined using Apache Avro.
>>
>> So I have a Measurement.avdl that (among other things) contains something
>> like this:
>>
>> record Measurement {
>>/** The time (epoch in milliseconds since 1970-01-01 UTC) when the
>> event occurred */
>> longtimestamp;
>> string  letter;
>> longpageviews;
>> }
>>
>>
>> Now because the registerDataStream call can also derive the schema from
>> the provided data I can do this:
>>
>> DataStream inputStream = ...
>> tableEnv.registerDataStream("DataStream", inputStream);
>>
>>
>> This is very nice because any real schema is big (few hundred columns)
>> and changes over time.
>>
>> Now In the SQL the timestamp is a BIGINT and not a DATETIME and as a
>> 

??????Flink1.9 sql ????????

2019-09-10 Thread zhangjun
kafka??kafka??group





--  --
??:  

How to implement grouping set in stream

2019-09-10 Thread 刘建刚
  I want to implement grouping set in stream. I am new to flink sql. I
want to find a example to teach me how to self define rule and
implement corresponding operator. Can anyone give me any suggestion?


How to implement grouping set in stream

2019-09-10 Thread 刘建刚
  I want to implement grouping set in stream. I am new to flink sql. I
want to find a example to teach me how to self define rule and
implement corresponding operator. Can anyone give me any suggestion?


Re: Checkpointing is not performing well

2019-09-10 Thread Vijay Bhaskar
For me task count seems to be huge in number with the mentioned resource
count. To rule out the possibility of issue with state backend can you
start writing sink data as  , i.e., data ignore sink. And try
whether you could run it for longer duration without any issue. You can
start decreasing the task manager count until you find descent count of it
without having any side effects. Use that value as task manager count and
then start adding your state backend. First you can try with Rocks DB. With
reduced task manager count you might get good results.

Regards
Bhaskar

On Sun, Sep 8, 2019 at 10:15 AM Rohan Thimmappa 
wrote:

> Ravi, have you looked at the io operation(iops) rate of the disk? You can
> monitoring the iops performance and tune it accordingly with your work
> load. This helped us in our project when we hit the wall tuning prototype
> much all the parameters.
>
> Rohan
>
>
> --
> *From:* Ravi Bhushan Ratnakar 
> *Sent:* Saturday, September 7, 2019 5:38 PM
> *To:* Rafi Aroch
> *Cc:* user
> *Subject:* Re: Checkpointing is not performing well
>
> Hi Rafi,
>
> Thank you for your quick response.
>
> I have tested with rocksdb state backend. Rocksdb required significantly
> more taskmanager to perform as compare to filesystem state backend. The
> problem here is that checkpoint process is not fast enough to complete.
>
> Our requirement is to do checkout as soon as possible like in 5 seconds to
> flush the output to output sink. As the incoming data rate is high, it is
> not able to complete quickly. If I increase the checkpoint duration, the
> state size grows much faster and hence takes much longer time to complete
> checkpointing. I also tried to use AT LEAST ONCE mode, but does not improve
> much. Adding more taskmanager to increase parallelism also does not improve
> the checkpointing performance.
>
> Is it possible to achieve checkpointing as short as 5 seconds with such
> high input volume?
>
> Regards,
> Ravi
>
> On Sat 7 Sep, 2019, 22:25 Rafi Aroch,  wrote:
>
>> Hi Ravi,
>>
>> Consider moving to RocksDB state backend, where you can enable
>> incremental checkpointing. This will make you checkpoints size stay pretty
>> much constant even when your state becomes larger.
>>
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/state_backends.html#the-rocksdbstatebackend
>>
>>
>> Thanks,
>> Rafi
>>
>> On Sat, Sep 7, 2019, 17:47 Ravi Bhushan Ratnakar <
>> ravibhushanratna...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I am writing a streaming application using Flink 1.9. This application
>>> consumes data from kinesis stream which is basically avro payload.
>>> Application is using KeyedProcessFunction to execute business logic on the
>>> basis of correlation id using event time characteristics with below
>>> configuration --
>>> StateBackend - filesystem with S3 storage
>>> registerTimeTimer duration for each key is  -  currentWatermark  + 15
>>> seconds
>>> checkpoint interval - 1min
>>> minPauseBetweenCheckpointInterval - 1 min
>>> checkpoint timeout - 10mins
>>>
>>> incoming data rate from kinesis -  ~10 to 21GB/min
>>>
>>> Number of Task manager - 200 (r4.2xlarge -> 8cpu,61GB)
>>>
>>> First 2-4 checkpoints get completed within 1mins where the state size is
>>> usually 50GB. As the state size grows beyond 50GB, then checkpointing time
>>> starts taking more than 1mins and it increased till 10 mins and then
>>> checkpoint fails. The moment the checkpoint starts taking more than 1 mins
>>> to complete then application starts processing slow and start lagging in
>>> output.
>>>
>>> Any suggestion to fine tune checkpoint performance would be highly
>>> appreciated.
>>>
>>> Regards,
>>> Ravi
>>>
>>


??????Kafka ?? extractly-once

2019-09-10 Thread ??????
??
??jasine 
chenkafka??



??
maqy


----
??:"Jimmy Wong"https://zhuanlan.zhihu.com/p/77677075




??
maqy


--  --
??:nbsp;"Jimmy Wong"

Flink任务提交失败

2019-09-10 Thread chengya...@idengyun.com
版本:flink1.7.1
系统:centos 7
提交job前已经执行start-cluster.sh
作业提交主机名:test04

当我用如下命令提交flink job时总是报以下错误,请大家帮忙排查原因,谢谢

$ bin/flink run -m test04:8081 -c 
org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass 
flink-scala-project1.jar
Starting execution of program


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Could not retrieve 
the execution result. (JobID: f9ac0c76e0e44cac6d6c3b1c41afa161)
at 
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:261)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:487)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
at 
org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:645)
at 
org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass$.main(FlinkConsumerKafkaSinkToKuduMainClass.scala:16)
at 
org.apache.flink.quickstart.factory.FlinkConsumerKafkaSinkToKuduMainClass.main(FlinkConsumerKafkaSinkToKuduMainClass.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:813)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:287)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)
at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1126)
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed to 
submit JobGraph.
at 
org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:380)
at 
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at 
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:216)
at 
java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at 
java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at 
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.readRawResponse(RestClient.java:515)
at 
org.apache.flink.runtime.rest.RestClient$ClientHandler.channelRead0(RestClient.java:452)
at 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 
org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at 
org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at 

Re: How do I start a Flink application on my Flink+Mesos cluster?

2019-09-10 Thread Felipe Gutierrez
I managed to find what was going wrong. I will write here just for the
record.

First, the master machine was not login automatically at itself. So I had
to give permission for it.

chmod og-wx ~/.ssh/authorized_keys
chmod 750 $HOME

Then I put the number of "mesos.resourcemanager.tasks.cpus" to be equal or
less the available cores on a single node of the cluster. I am not sure
about this parameter, but only after this configuration it worked.

Felipe
*--*
*-- Felipe Gutierrez*

*-- skype: felipe.o.gutierrez*
*--* *https://felipeogutierrez.blogspot.com
*


On Fri, Sep 6, 2019 at 10:36 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi,
>
> I am running Mesos without DC/OS [1] and Flink on it. Whe I start my
> cluster I receive some messages that I suppose everything was started.
> However, I see 0 slats available on the Flink web dashboard. But I suppose
> that Mesos will allocate Slots and Task Managers dynamically. Is that right?
>
> $ ./bin/mesos-appmaster.sh &
> [1] 16723
> flink@r03:~/flink-1.9.0$ I0906 10:22:45.080328 16943 sched.cpp:239]
> Version: 1.9.0
> I0906 10:22:45.082672 16996 sched.cpp:343] New master detected at
> mas...@xxx.xxx.xxx.xxx:5050
> I0906 10:22:45.083276 16996 sched.cpp:363] No credentials provided.
> Attempting to register without authentication
> I0906 10:22:45.086840 16997 sched.cpp:751] Framework registered with
> 22f6a553-e8ac-42d4-9a90-96a8d5f002f0-0003
>
> Then I deploy my Flink application. When I use the first command to deploy
> the application starts. However, the tasks remain CREATED until Flink
> throws a timeout exception. In other words, it never turns to RUNNING.
> When I use the second comman to deploy the application it does not start
> and I receive the exception of "Could not allocate all requires slots
> within timeout of 30 ms. Slots required: 2". The full stacktrace is
> below.
>
> $ /home/flink/flink-1.9.0/bin/flink run
> /home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &
> $ ./bin/mesos-appmaster-job.sh run
> /home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/mesos.html#mesos-without-dcos
> ps.: my application runs normally on a standalone Flink cluster.
>
> 
>  The program finished with the following exception:
>
> org.apache.flink.client.program.ProgramInvocationException: Job failed.
> (JobID: 7ad8d71faaceb1ac469353452c43dc2a)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
> at
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at org.hello_flink_mesos.App.(App.java:35)
> at org.hello_flink_mesos.App.main(App.java:285)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
> at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
> at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> execution failed.
> at
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
> at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
> ... 22 more
> Caused by:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Could not allocate all requires slots within timeout of 30 ms. Slots
> required: 2, slots allocated: 0, previous allocation IDs: [], execution
> status: completed 

Re: Flink大state读取磁盘,磁盘IO打满,任务相互影响的问题探讨

2019-09-10 Thread Wesley Peng




on 2019/9/10 13:47, 蒋涛涛 wrote:

尝试手段:

1. 手动迁移IO比较高的任务到其他机器,但是yarn任务提交比较随机,只能偶尔为之

2. 目前没有SSD,只能用普通STATA盘,目前加了两块盘提示磁盘IO能力,但是单盘对单任务的磁盘IO瓶颈还在

还有哪些策略可以解决或者缓解么?


It seems the tricks to improve RocksDB's throughput might be helpfu.

With writes and reads accessing mostly the recent data, our goal is to 
let them stay in memory as much as possible without using up all the 
memory on the server. The following parameters are worth tuning:


Block cache size: When uncompressed blocks are read from SSTables, they 
are cached in memory. The amount of data that can be stored before 
eviction policies apply is determined by the block cache size. The 
bigger the better.


Write buffer size: How big can Memtable get before it is frozen. 
Generally, the bigger the better. The tradeoff is that big write buffer 
takes more memory and longer to flush to disk and to recover.


Write buffer number: How many Memtables to keep before flushing to 
SSTable. Generally, the bigger the better. Similarly, the tradeoff is 
that too many write buffers take up more memory and longer to flush to disk.


Minimum write buffers to merge: If most recently written keys are 
frequently changed, it is better to only flush the latest version to 
SSTable. This parameter controls how many Memtables it will try to merge 
before flushing to SSTable. It should be less than the write buffer 
number. A suggested value is 2. If the number is too big, it takes 
longer to merge buffers and there is less chance of duplicate keys in 
that many buffers.


The list above is far from being exhaustive, but tuning them correctly 
can have a big impact on performance. Please refer to RocksDB’s Tuning 
Guide for more details on these parameters. Figuring out the optimal 
combination of values for all of them is an art in itself.


please ref: https://klaviyo.tech/flinkperf-c7bd28acc67

regards.