Re: Flink SSL Setup on a standalone cluster

2018-03-23 Thread Vinay Patil
Hi,

The passwords are shown in plain text in logs , is this fixed in newer
versions of flink (I am using 1.3.2)

Also, please let me know the answer to my previous queries in this mail
chain

Regards,
Vinay Patil

On Mon, Mar 19, 2018 at 7:35 PM, Vinay Patil 
wrote:

> Hi,
>
> When I set ssl.verify.hostname to true , the job fails with SSL handshake
> exception where it tries to match the IP address  instead of the hostname
> in the certificates. Everything works when I set this to false. The
> keystore is created with FQDN.
> The solution of adding all the hostnames and IP addresses in SAN list is
> discarded by the company.
>
> And a security concern is raised when I set this parameter to false. I see
> this https://issues.apache.org/jira/browse/FLINK-5030 in Unresolved
> state.
> How do Flink support hostname verification ?
>
> @Chesnay : It would be helpful to know the answer to my previous mail
>
> Regards,
> Vinay Patil
>
> On Fri, Mar 16, 2018 at 10:15 AM, Vinay Patil 
> wrote:
>
>> Hi Chesnay,
>>
>> After setting the configurations for Remote Execution Environment the job
>> gets submitted ,I had to set ssl-verify-hostname to false.
>> However, I don't understand why there is a need to do it. I am running
>> the job from master node itself and providing all the configurations in
>> flink-conf.yaml while creating the cluster. So why do I have to copy the
>> same stuff in code ?
>>
>> Regards,
>> Vinay Patil
>>
>> On Fri, Mar 16, 2018 at 8:23 AM, Vinay Patil 
>> wrote:
>>
>>> Hi,
>>>
>>> No I am not passing any config to the remote execution environment. I am
>>> running the job from master node itself. I have provided SSL configs in
>>> flink-xonf.yaml
>>>
>>> Do I need to specify any SSL.config as part of Remote Execution env ?
>>>
>>> If yes can you please provide me an example.
>>>
>>>
>>>
>>> On Mar 16, 2018 1:56 AM, "Chesnay Schepler [via Apache Flink User
>>> Mailing List archive.]"  wrote:
>>>
>>> How are you creating the remote environment? In particular, are passing
>>> a configuration to the RemoteEnvironment?
>>> Have you set the SSL options in the config?
>>>
>>>
>>> On 15.03.2018 22:46, Vinay Patil wrote:
>>>
>>> Hi,
>>>
>>> Even tried with ip-address for JobManager.host.name property, but did
>>> not work. When I tried netstat -anp | grep 6123 , I see 3 TM connection
>>> state as established, however when I submit the job , I see two more
>>> entries with state as TIME_WAIT and after some time these entries are gone
>>> and I get a Lost to Job Manager Exception.
>>>
>>> This only happens when SSL is enabled.
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Thu, Mar 15, 2018 at 10:28 AM, Vinay Patil <[hidden email]
>>> > wrote:
>>>
 Just an update,  I am submitting the job from the master node, not
 using the normal flink run command to submit the job , but using Remote
 Execution Environment in code to do this.

 And in that I am passing the hostname which is same as provided in
 flink-conf.yaml

 Regards,
 Vinay Patil

 On Thu, Mar 15, 2018 at 7:57 AM, Vinay Patil <[hidden email]
 > wrote:

> Hi Guys,
>
> Any suggestions here
>
> Regards,
> Vinay Patil
>
> On Wed, Mar 14, 2018 at 8:08 PM, Vinay Patil <[hidden email]
> > wrote:
>
>> Hi,
>>
>> After waiting for some time I got the exception as Lost Connection to
>> Job Manager. Message: Could not retrieve the JobExecutionResult from Job
>> Manager
>>
>> I am submitting the job as remote execution environment. I have
>> specified the exact hostname of JobManager and port as 6123.
>>
>> Please let me know if any other configurations are needed.
>>
>> Regards,
>> Vinay Patil
>>
>> On Wed, Mar 14, 2018 at 11:48 AM, Vinay Patil <[hidden email]
>> > wrote:
>>
>>> Hi Timo,
>>>
>>> Not getting any exception , it just says waiting for job completion
>>> with a Job ID printed.
>>>
>>>
>>>
>>> Regards,
>>> Vinay Patil
>>>
>>> On Wed, Mar 14, 2018 at 11:34 AM, Timo Walther [via Apache Flink
>>> User Mailing List archive.] <[hidden email]
>>> > wrote:
>>>
 Hi Vinay,

 do you have any exception or log entry that describes the failure?

 Regards,
 Timo


 Am 14.03.18 um 15:51 schrieb Vinay Patil:

 Hi,

 I have keystore for each of the 4 nodes in cluster and respective
 trustore. The cluster is configured correctly with SSL , verified this 
 by
 accessing job manager using https and 

"dynamic" bucketing sink

2018-03-23 Thread Christophe Jolif
Hi all,

I'm using the nice topic pattern feature on the KafkaConsumer to read from
multiple topics, automatically discovering new topics added into the system.

At the end of the processing I'm sinking the result into a Hadoop
Filesystem using a BucketingSink.

All works great until I get the requirement to sink into a different Hadoop
Filesystem based on the input topic.

One way to do this would obviously be to get rid of the topic pattern and
start a (similar) job per topic which would each get its own sink to its
own filesystem. And start new jobs when new topics are added. But that's
far from being ideal. This would lead to the usual issues with Flink and a
dynamic number of jobs (requiring new task slots...) also obviously it
would require some external machinery to know new topics have been added
and create new jobs etc...

What would be the recommended way to have a "dynamic" BucketingSink that
can not only write to several basePath (not too hard I guess) but also
dynamically add new base path when new topics are coming into the system.

Thanks,
-- 
Christophe


Re: Incremental checkpointing performance

2018-03-23 Thread Nico Kruber
Hi Miyuru,
regarding "state.backend", I was looking at version 1.5 docs and some
things changed compared to 1.3. The "Asynchronous RocksDB snapshot ..."
messages only occur with full snapshots, i.e. non-incremental, and I
verified this for your program as well.

There are some issues with your project though:
1) your Flink dependencies should all have the same version
2) your source does not acquire the checkpoint lock before emitting
events (see the docs around the SourceFunction you are implementing)


Regarding the checkpoint sizes: you can rely on the web interface
reporting correct metrics. However, the "average" values may not be too
much useful for you since you are using a sliding count window and thus
during ramp-up (until you get your 1 windows of the slide size) you
will have smaller states than after than. Since you only have 2 keys,
you will eventually have 2 window states to store and from then on
stay with this number. So rather look at the "History" column of the web
interface or into the JobManager log.


Regarding the original issue: I was recently made aware of another thing
which may influence the speed of an incremental snapshot: if done
incrementally, we need to close and flush RocksDB's sst file so that it
continues with a new file and we can hardlink and copy a consistent
snapshot. For full snapshots, we simple iterate over all items to copy.
Now this close-and-flush may be more costly (hence the higher duration)
and since this cannot be done asynchronously (as a full snapshot) we
also may not process as many records.
-> Therefore, you probably did not run your program long enough to
create the full set of windows and I'm guessing, you will eventually get
to the same checkpoint sizes.


TLDR; incremental snapshots are worth only (and are designed for...) if
you have a lot of operator state (not just a few MB!) while only few
parts are actually changing between checkpoints. In these scenarios, the
added latency for transferring such a snapshot to the checkpoint store
over network would cover the additional cost during snapshot creation.


Nico


On 21/03/18 06:01, Miyuru Dayarathna wrote:
> Hi,
> 
> Since we could not observe log messages such as "Asynchronous RocksDB
> snapshot" in the Flink's log files, we ran the application with Flink
> 1.3.3 as well. But it also did not print the log message. Hence I am
> wondering whether we ran Flink's incremental checkpointing in the
> correct manner. I have attached the complete application with this
> email. Could you please run this in your setup and let me know whether
> you get the incremental checkpoint related logs printed in your Flink setup?
> 
> Thanks,
> Miyuru
> 
> 
> 
> 
> On Monday, 19 March 2018, 23:33:37 GMT+5:30, Miyuru Dayarathna
>  wrote:
> 
> 
> Hi Nico,
> 
> Thanks for the detailed explanation. The only change I have made in my
> flink-conf.yaml file is the following.
> 
> state.backend.fs.checkpointdir: file:///home/ubuntu/tmp-flink-rocksdb
> 
> The default "state.backend" value is set to filesystem. Removing the
> env.setStateBackend() method code or changing the "state.backend"
> property to rocksdb does not change the state backend to RocksDB. I got
> this verified by looking at the Flink log files. I have mentioned a
> sample of the log file for your reference.
> 
> ---
> carbon-5th:38631/user/taskmanager) as 1ac63dfb481eab3d3165a965084115f3.
> Current number of registered hosts is 1. Current number of alive task
> slots is 1.
> 2018-03-19 23:10:11,606 INFO 
> org.apache.flink.runtime.client.JobClient - Checking
> and uploading JAR files
> 2018-03-19 23:10:11,618 INFO 
> org.apache.flink.runtime.jobmanager.JobManager    -
> Submitting job 7c19a14f4e75149ffaa064fac7e2bf29 (Flink application.).
> 2018-03-19 23:10:11,623 INFO 
> org.apache.flink.runtime.jobmanager.JobManager    - Using
> restart strategy
> FixedDelayRestartStrategy(maxNumberRestartAttempts=2147483647,
> delayBetweenRestartAttempts=1) for 7c19a14f4e75149ffaa064fac7e2bf29.
> 2018-03-19 23:10:11,636 INFO 
> org.apache.flink.runtime.executiongraph.ExecutionGraph    - Job
> recovers via failover strategy: full graph restart
> 2018-03-19 23:10:11,648 INFO 
> org.apache.flink.runtime.jobmanager.JobManager    - Running
> initialization on master for job Flink application.
> (7c19a14f4e75149ffaa064fac7e2bf29).
> 2018-03-19 23:10:11,648 INFO 
> org.apache.flink.runtime.jobmanager.JobManager    -
> Successfully ran initialization on master in 0 ms.
> 2018-03-19 23:10:11,664 INFO 
> org.apache.flink.runtime.jobmanager.JobManager    - Using
> application-defined state backend for checkpoint/savepoint metadata:
> RocksDB State Backend {isInitialized=false, configuredDbBasePaths=null,
> initializedDbBasePaths=null, checkpointStreamBackend=File State Backend
> @ file:/home/ubuntu/tmp-flink-rocksdb}.
> 

RE: Lucene SPI class loading fails with shaded flink-connector-elasticsearch

2018-03-23 Thread Haddadi Manuel
Hi Gordon, hi Till,


Thanks for your feedback. I am happy to contibute by precising how the bug 
occured, if it might help.


First, to describe a bit more what does my Flink job, there is in a part of its 
execution plan a ProcessFunction which basically stores the events as Lucene 
documents in an in-memory Lucene index. When the number of documents reaches a 
threshold, the process function fires Lucene queries to filter the documents 
(then the events) according to user models.


Therefore this process function is dependent on Lucene modules lucene-core, 
lucene-queryparser, lucene-analyzers-common in version 6.3.0 (as a precaution 
we chose the same version than elasticsearch:5.1.2).


Later the event stream is sent in an Elasticseach index via the module 
flink-connector-elasticsearch5.


I have updgraded Flink dependencies from version 1.3.2 to 1.4.2. When the job 
was deployed on a Yarn cluster, it raised the error :

java.util.ServiceConfigurationError: An SPI class of type 
org.apache.lucene.codecs.PostingsFormat with classname 
org.apache.lucene.search.suggest.document.Completion50PostingsFormat does not 
exist, please fix the file 
'META-INF/services/org.apache.lucene.codecs.PostingsFormat' in your classpath.


So I checked the META-INF/services/org.apache.lucene.codecs.PostingsFormat in 
my job's fat jar. It contained several implementation of PostingsFormat to be 
loaded :

org.apache.lucene.search.suggest.document.Completion50PostingsFormat
org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat
org.apache.lucene.codecs.lucene50.Lucene50PostingsFormat
org.apache.lucene.codecs.idversion.IDVersionPostingsFormat

I don't know how the maven-shade-plugin operates but it seems to me that it 
aggregates the same configuration files from different modules in one file.

For example, in elasticsearch-5.1.2.jar, the file 
org.apache.lucene.codecs.PostingsFormat is :

org.apache.lucene.search.suggest.document.Completion50PostingsFormat
org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat

In flink-connector-elasticsearch5_2.11-1.4.2.jar, the file 
org.apache.lucene.codecs.PostingsFormat is :

org.apache.lucene.search.suggest.document.Completion50PostingsFormat
org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat
#  Licensed to the Apache Software Foundation (ASF) under one or more
#  contributor license agreements.  See the NOTICE file distributed with
#  this work for additional information regarding copyright ownership.
#  The ASF licenses this file to You under the Apache License, Version 2.0
#  (the "License"); you may not use this file except in compliance with
#  the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

org.apache.lucene.codecs.lucene50.Lucene50PostingsFormat
#  Licensed to the Apache Software Foundation (ASF) under one or more
#  contributor license agreements.  See the NOTICE file distributed with
#  this work for additional information regarding copyright ownership.
#  The ASF licenses this file to You under the Apache License, Version 2.0
#  (the "License"); you may not use this file except in compliance with
#  the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

org.apache.lucene.codecs.idversion.IDVersionPostingsFormat
#
#  Licensed to the Apache Software Foundation (ASF) under one or more
#  contributor license agreements.  See the NOTICE file distributed with
#  this work for additional information regarding copyright ownership.
#  The ASF licenses this file to You under the Apache License, Version 2.0
#  (the "License"); you may not use this file except in compliance with
#  the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

org.apache.lucene.search.suggest.document.Completion50PostingsFormat

Since my job's fat jar inherits configuration files in META-INF/services from 
its dependencies, I guess this 

Flink CEP detect multiple pattern witin their specific time window

2018-03-23 Thread shishal
Hi,

I have 2 different pattern, 1st pattern should appear within 4 hours and
another pattern should appear within 24 hour.

I can go ahead and create 2 jobs running on flink cluster. But is it
possible to use same job to detect both patter within their own time window
as my source stream and elastic sink is same for both job.

Regards,
Shishal



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Lucene SPI class loading fails with shaded flink-connector-elasticsearch

2018-03-23 Thread Tzu-Li (Gordon) Tai
Hi Manuel,

Thanks a lot for reporting this!

Yes, this issue is most likely related to the recent changes to shading the 
Elasticsearch connector dependencies, though it is a bit curious why I didn’t 
bump into it before while testing it.

The Flink job runs Lucene queries on a data stream which ends up in an 
Elasticsearch index.

Could you explain a bit more where the Lucene queries are executed? Were there 
other dependencies required for this?

I would highly appreciate any opinion on this workaround. Could it have side 
effect ?

I think your workaround wouldn’t be harmful. Could you explain how you came to 
the solution? That would help me in getting to the bottom of the problem (and 
maybe other potential similar issues).

Cheers,
Gordon

On 23 March 2018 at 12:43:31 AM, Till Rohrmann (till.rohrm...@gmail.com) wrote:

Hi Manuel,

thanks for reporting this issue. It sounds to me like a bug we should fix. I've 
pulled Gordon into the conversation since he will most likely know more about 
the ElasticSearch connector shading.

Cheers,
Till

On Thu, Mar 22, 2018 at 5:09 PM, Haddadi Manuel  wrote:
Hello,
 
When upgrading from flink-1.3.2 to flink-1.4.2, I faced this error on runtime 
of a Flink job :
 
java.util.ServiceConfigurationError: An SPI class of type 
org.apache.lucene.codecs.PostingsFormat with classname 
org.apache.lucene.search.suggest.document.Completion50PostingsFormat does not 
exist, please fix the file 
'META-INF/services/org.apache.lucene.codecs.PostingsFormat' in your classpath.
 
I added lucene-suggest dependency and then I encountered this :
java.lang.ClassCastException: class 
org.elasticsearch.search.suggest.completion2x.Completion090PostingsFormat
 
The Flink job runs Lucene queries on a data stream which ends up in an 
Elasticsearch index.
 
It seems to me that this exception is a side effect of shading 
flink-connector-elasticsearch-5 dependencies. Actually, the only solution I 
have found is to re-build flink-connector-elasticsearch-5 jar excluding 
META-INF/services/org.apache.lucene.codecs.*
 
I would highly appreciate any opinion on this workaround. Could it have side 
effect ?
 
Thanks. And by the way, congrats to all Flink contributors, this is a pretty 
good piece of technology !
 
Regards,
 
Manuel Haddadi




Re: Example PopularPlacesFromKafka fails to run

2018-03-23 Thread Fabian Hueske
Thanks for the feedback!

Fabian

2018-03-23 8:02 GMT+01:00 James Yu :

> Hi,
>
> When I proceed further to timePrediction exercise (http://training.data-
> artisans.com/exercises/timePrediction.html), I realize that the 
> nycTaxiRides.gz's
> format is fine.
> The problem is in TaxiRide.toString(), the columns were serialized in
> wrong order. Hence the data persisted in Kafka has wrong format.
> Therefore I change TaxiRide.toString() to the following:
>
>   public String toString() {
> StringBuilder sb = new StringBuilder();
> sb.append(rideId).append(",");
> sb.append(isStart ? "START" : "END").append(",");
> sb.append(startTime.toString(timeFormatter)).append(",");
> sb.append(endTime.toString(timeFormatter)).append(",");
> sb.append(startLon).append(",");
> sb.append(startLat).append(",");
> sb.append(endLon).append(",");
> sb.append(endLat).append(",");
> sb.append(passengerCnt).append(",");
> sb.append(taxiId).append(",");
> sb.append(driverId);
>
> return sb.toString();
>   }
>
>
>
> This is a UTF-8 formatted mail
> ---
> James C.-C.Yu
> +886988713275 <+886%20988%20713%20275>
>
> 2018-03-23 9:59 GMT+08:00 James Yu :
>
>> Just figured out the data format in nycTaxiRides.gz doesn't match to the
>> way TaxiRide.java interpreting the lines fed into it.
>> Then I check the exercise training github and found the TaxiRide.java (
>> https://github.com/dataArtisans/flink-training-exercises/
>> tree/master/src/main/java/com/dataartisans/flinktraining/
>> exercises/datastream_java/datatypes) was recently updated (like 11 days
>> ago).
>> After making some changes to TaxiRide.java, the example works like a
>> charm.
>>
>> I got the nycTaxiRides.gz by issuing this line in console:
>> wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz
>>
>> Following is the changes I made to TaxiRide.java locally (basically just
>> the index to variable tokens):
>> try {
>> ride.rideId = Long.parseLong(tokens[0]);
>>
>> switch (tokens[3]) {
>> case "START":
>> ride.isStart = true;
>> ride.startTime = DateTime.parse(tokens[4], timeFormatter);
>> ride.endTime = DateTime.parse(tokens[5], timeFormatter);
>> break;
>> case "END":
>> ride.isStart = false;
>> ride.endTime = DateTime.parse(tokens[4], timeFormatter);
>> ride.startTime = DateTime.parse(tokens[5], timeFormatter);
>> break;
>> default:
>> throw new RuntimeException("Invalid record: " + line);
>> }
>>
>> ride.startLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) :
>> 0.0f;
>> ride.startLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) :
>> 0.0f;
>> ride.endLon = tokens[8].length() > 0 ? Float.parseFloat(tokens[8]) : 0.0f;
>> ride.endLat = tokens[9].length() > 0 ? Float.parseFloat(tokens[9]) : 0.0f;
>> ride.passengerCnt = Short.parseShort(tokens[10]);
>> ride.taxiId = Long.parseLong(tokens[1]);
>> ride.driverId = Long.parseLong(tokens[2]);
>>
>> } catch (NumberFormatException nfe) {
>> throw new RuntimeException("Invalid record: " + line, nfe);
>> }
>>
>>
>> This is a UTF-8 formatted mail
>> ---
>> James C.-C.Yu
>> +886988713275 <+886%20988%20713%20275>
>>
>> 2018-03-23 8:06 GMT+08:00 James Yu :
>>
>>> Hi,
>>>
>>> I fail to run the PopularPlacesFromKafka example with the following
>>> exception, and I wonder what might cause this "Invalid record" error?
>>>
>>> when running within Intellij IDEA -->
>>> 07:52:23.960 [Source: Custom Source -> Map (7/8)] INFO
>>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
>>> Map (7/8) (930e95aac65cbda39d9f1eaa41891253) switched from RUNNING to
>>> FAILED.
>>> java.lang.RuntimeException: Invalid record: 4010,2013003778
>>> <(201)%20300-3778>,2013003775 <(201)%20300-3775>,START,2013-01-01
>>> 00:13:00,1970-01-01 00:00:00,-74.00074,40.7359,-73.98559,40.739063,1
>>> at com.dataartisans.flinktraining.exercises.datastream_java.dat
>>> atypes.TaxiRide.fromString(TaxiRide.java:119)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at com.dataartisans.flinktraining.exercises.datastream_java.uti
>>> ls.TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at com.dataartisans.flinktraining.exercises.datastream_java.uti
>>> ls.TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at org.apache.flink.streaming.util.serialization.KeyedDeseriali
>>> zationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09
>>> Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum
>>> erBase.run(FlinkKafkaConsumerBase.java:652)
>>> ~[flink-training-exercises-0.15.1.jar:na]
>>> at 
>>> 

Re: Restart hook and checkpoint

2018-03-23 Thread Fabian Hueske
Yes, that would be great!

Thank you, Fabian

2018-03-23 3:06 GMT+01:00 Ashish Pokharel :

> Fabian, that sounds good. Should I recap some bullets in an email and
> start a new thread then?
>
> Thanks, Ashish
>
>
> On Mar 22, 2018, at 5:16 AM, Fabian Hueske  wrote:
>
> Hi Ashish,
>
> Agreed!
> I think the right approach would be to gather the requirements and start a
> discussion on the dev mailing list.
> Contributors and committers who are more familiar with the checkpointing
> and recovery internals should discuss a solution that can be integrated and
> doesn't break with the current mechanism.
> For instance (not sure whether this is feasible or solves the problem) one
> could only do local checkpoints and not write to the distributed persistent
> storage. That would bring down checkpointing costs and the recovery life
> cycle would not need to be radically changed.
>
> Best, Fabian
>
> 2018-03-20 22:56 GMT+01:00 Ashish Pokharel :
>
>> I definitely like the idea of event based checkpointing :)
>>
>> Fabian, I do agree with your point that it is not possible to take a
>> rescue checkpoint consistently. The basis here however is not around the
>> operator that actually failed. It’s to avoid data loss across 100s
>> (probably 1000s of parallel operators) which are being restarted and are
>> “healthy”. We have 100k (nearing million soon) elements pushing data.
>> Losing few seconds worth of data for few is not good but “acceptable” as
>> long as damage can be controlled. Of course, we are going to use rocksdb +
>> 2-phase commit with Kafka where we need exactly once guarantees. The
>> proposal of “fine grain recovery https://cwiki.apache.
>> org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recover
>> y+from+Task+Failures
>> ”
>> seems like a good start at least from damage control perspective but even
>> with that it feels like something like “event based approach” can be done
>> for a sub-set of job graph that are “healthy”.
>>
>> Thanks, Ashish
>>
>>
>> On Mar 20, 2018, at 9:53 AM, Fabian Hueske  wrote:
>>
>> Well, that's not that easy to do, because checkpoints must be coordinated
>> and triggered the JobManager.
>> Also, the checkpointing mechanism with flowing checkpoint barriers (to
>> ensure checkpoint consistency) won't work once a task failed because it
>> cannot continue processing and forward barriers. If the task failed with an
>> OOME, the whole JVM is gone anyway.
>> I don't think it is possible to take something like a consistent rescue
>> checkpoint in case of a failure.
>>
>> I might be possible to checkpoint application state of non-failed tasks,
>> but this would result in data loss for the failed task and we would need to
>> weigh the use cases for such a feature are the implementation effort.
>> Maybe there are better ways to address such use cases.
>>
>> Best, Fabian
>>
>> 2018-03-20 6:43 GMT+01:00 makeyang :
>>
>>> currently there is only time based way to trigger a checkpoint. based on
>>> this
>>> discussion, I think flink need to introduce event based way to trigger
>>> checkpoint such as restart a task manager should be count as a event.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/
>>>
>>
>>
>>
>
>


Re: Example PopularPlacesFromKafka fails to run

2018-03-23 Thread James Yu
Hi,

When I proceed further to timePrediction exercise (
http://training.data-artisans.com/exercises/timePrediction.html), I realize
that the nycTaxiRides.gz's format is fine.
The problem is in TaxiRide.toString(), the columns were serialized in wrong
order. Hence the data persisted in Kafka has wrong format.
Therefore I change TaxiRide.toString() to the following:

  public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(rideId).append(",");
sb.append(isStart ? "START" : "END").append(",");
sb.append(startTime.toString(timeFormatter)).append(",");
sb.append(endTime.toString(timeFormatter)).append(",");
sb.append(startLon).append(",");
sb.append(startLat).append(",");
sb.append(endLon).append(",");
sb.append(endLat).append(",");
sb.append(passengerCnt).append(",");
sb.append(taxiId).append(",");
sb.append(driverId);

return sb.toString();
  }



This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275

2018-03-23 9:59 GMT+08:00 James Yu :

> Just figured out the data format in nycTaxiRides.gz doesn't match to the
> way TaxiRide.java interpreting the lines fed into it.
> Then I check the exercise training github and found the TaxiRide.java (
> https://github.com/dataArtisans/flink-training-exercises/tree/master/src/
> main/java/com/dataartisans/flinktraining/exercises/
> datastream_java/datatypes) was recently updated (like 11 days ago).
> After making some changes to TaxiRide.java, the example works like a charm.
>
> I got the nycTaxiRides.gz by issuing this line in console:
> wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz
>
> Following is the changes I made to TaxiRide.java locally (basically just
> the index to variable tokens):
> try {
> ride.rideId = Long.parseLong(tokens[0]);
>
> switch (tokens[3]) {
> case "START":
> ride.isStart = true;
> ride.startTime = DateTime.parse(tokens[4], timeFormatter);
> ride.endTime = DateTime.parse(tokens[5], timeFormatter);
> break;
> case "END":
> ride.isStart = false;
> ride.endTime = DateTime.parse(tokens[4], timeFormatter);
> ride.startTime = DateTime.parse(tokens[5], timeFormatter);
> break;
> default:
> throw new RuntimeException("Invalid record: " + line);
> }
>
> ride.startLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) :
> 0.0f;
> ride.startLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) :
> 0.0f;
> ride.endLon = tokens[8].length() > 0 ? Float.parseFloat(tokens[8]) : 0.0f;
> ride.endLat = tokens[9].length() > 0 ? Float.parseFloat(tokens[9]) : 0.0f;
> ride.passengerCnt = Short.parseShort(tokens[10]);
> ride.taxiId = Long.parseLong(tokens[1]);
> ride.driverId = Long.parseLong(tokens[2]);
>
> } catch (NumberFormatException nfe) {
> throw new RuntimeException("Invalid record: " + line, nfe);
> }
>
>
> This is a UTF-8 formatted mail
> ---
> James C.-C.Yu
> +886988713275 <+886%20988%20713%20275>
>
> 2018-03-23 8:06 GMT+08:00 James Yu :
>
>> Hi,
>>
>> I fail to run the PopularPlacesFromKafka example with the following
>> exception, and I wonder what might cause this "Invalid record" error?
>>
>> when running within Intellij IDEA -->
>> 07:52:23.960 [Source: Custom Source -> Map (7/8)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
>> Map (7/8) (930e95aac65cbda39d9f1eaa41891253) switched from RUNNING to
>> FAILED.
>> java.lang.RuntimeException: Invalid record: 4010,2013003778
>> <(201)%20300-3778>,2013003775 <(201)%20300-3775>,START,2013-01-01
>> 00:13:00,1970-01-01 00:00:00,-74.00074,40.7359,-73.98559,40.739063,1
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> datatypes.TaxiRide.fromString(TaxiRide.java:119)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at org.apache.flink.streaming.util.serialization.KeyedDeseriali
>> zationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09
>> Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsum
>> erBase.run(FlinkKafkaConsumerBase.java:652)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>> at 
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>> at 
>>