Re: Flink takes 40ms ~ 100ms to proceed from one operator to another?

2018-09-20 Thread James Yu
The previous email seems unable to display embedded images, let me put on
the links.

> Hi,
>
> My team and I try to measure total time spent on our flink job and found
> out that Flink takes 40ms ~ 100ms to proceed from one operator to another.
> I wonder how can we reduce this transition time.
>
> Following DAG represents our job:
>
https://drive.google.com/open?id=1wNR8po-SooAfYtCxU3qUDm4-hm16tclV

>
>
> and here is the screenshot of our log:
>
https://drive.google.com/open?id=14PTViZMkhagxeNjOb4R8u3BEr7Ym1xBi

>
> at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
> at 19:37:04.605, the job is entering "Co-Flat Map"
> at 19:37:04.605, the job is leaving "Co-Flat Map"
> at 19:37:04.705, the job is entering "Co-Flat Map -> "
> at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."
>
> both "Co-Flat Map" finishes merely instantly, while most of the execution
> time is spent on the transition. Any idea?
>
>
> This is a UTF-8 formatted mail
> ---
> James C.-C.Yu
> +886988713275
> +8615618429976
>


Flink takes 40ms ~ 100ms to proceed from one operator to another?

2018-09-20 Thread James Yu
Hi,

My team and I try to measure total time spent on our flink job and found
out that Flink takes 40ms ~ 100ms to proceed from one operator to another.
I wonder how can we reduce this transition time.

Following DAG represents our job:


and here is the screenshot of our log:

at 19:37:04.564, the job is leaving "Source: Custom Source -> Flat Map"
at 19:37:04.605, the job is entering "Co-Flat Map"
at 19:37:04.605, the job is leaving "Co-Flat Map"
at 19:37:04.705, the job is entering "Co-Flat Map -> "
at 19:37:04.708, the job is leaving "Co-Flat Map -> ..."

both "Co-Flat Map" finishes merely instantly, while most of the execution
time is spent on the transition. Any idea?


This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275
+8615618429976


Re: Unsure how to further debug - operator threads stuck on java.lang.Thread.State: WAITING

2018-04-17 Thread James Yu
Miguel, I and my colleague ran into same problem yesterday.
We were expecting Flink to get 4 inputs from Kafka and write the inputs to
Cassandra, but the operators got stuck after the 1st input is written into
Cassandra.
This is how DAG looks like:
Source: Custom Source -> Map -> (Sink: Unnamed, Sink: Cassandra Sink)
After we disable the auto chaining (
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#task-chaining-and-resource-groups),
all 4 inputs are read from Kafka and written into Cassandra.
We are still figuring out why the chaining causes the blocking.


This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275

2018-04-18 6:57 GMT+08:00 Miguel Coimbra :

> Chesnay, following your suggestions I got access to the web interface and
> also took a closer look at the debugging logs.
> I have noticed one problem regarding the web interface port - it keeps
> changing port now and then during my Java program's execution.
>
> Not sure if that is due to my program launching several job executions
> sequentially, but the fact is that it happened.
> Since I am accessing the web interface via tunneling, it becomes rather
> cumbersome to keep adapting it.
>
> Another particular problem I'm noticing is that this exception frequently
> pops up (debugging with log4j):
>
> 00:17:54,368 DEBUG org.apache.flink.runtime.jobma
> ster.slotpool.SlotPool  - Releasing slot with slot request id
> 9055ef473251505dac04c99727106dc9.
> org.apache.flink.util.FlinkException: Slot is being returned to the
> SlotPool.
> at org.apache.flink.runtime.jobmaster.slotpool.SlotPool$Provide
> rAndOwner.returnAllocatedSlot(SlotPool.java:1521)
> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
> t.lambda$releaseSlot$0(SingleLogicalSlot.java:130)
> at java.util.concurrent.CompletableFuture.uniHandle(Completable
> Future.java:822)
> at java.util.concurrent.CompletableFuture.uniHandleStage(Comple
> tableFuture.java:834)
> at java.util.concurrent.CompletableFuture.handle(CompletableFut
> ure.java:2155)
> at org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlo
> t.releaseSlot(SingleLogicalSlot.java:130)
> at org.apache.flink.runtime.executiongraph.Execution.releaseAss
> ignedResource(Execution.java:1239)
> at org.apache.flink.runtime.executiongraph.Execution.markFinish
> ed(Execution.java:946)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
> eState(ExecutionGraph.java:1588)
> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecu
> tionState(JobMaster.java:593)
> at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
> thodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
> cation(AkkaRpcActor.java:210)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage
> (AkkaRpcActor.java:154)
> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleM
> essage(FencedAkkaRpcActor.java:66)
> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onRece
> ive$1(AkkaRpcActor.java:132)
> at akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell
> .scala:544)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.
> java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(
> ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPoo
> l.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinW
> orkerThread.java:107)
>
> Don't know if the internals of Flink are explicitly using an exception for
> control flow, but there are several occurrences of this as time goes by.
>
> Regarding my program itself, I've achieved some progress.
> In my program I need to do a sequence of series of Flink jobs, and need
> extra care to make sure no DataSet instance from job *i* is being used in
> an operator in job *i + 1*.
> I believe this was generating the waiting scenarios I describe in an
> earlier email.
> The bottom line is to be extra careful about when job executions are
> actually triggered and to make sure that a DataSet which will need to be
> used in different Flink jobs is available for example as a file in
> secondary storage (possibly masked as a 

Two questions on Flink externalized checkpoints

2018-04-07 Thread James Yu
I left a question on stackoverflow (
https://stackoverflow.com/questions/49712817/two-questions-on-flink-externalized-checkpoints)
and ask again in mail-list in case you check mail more often.

I have two questions on Flink externalized checkpoints

(Q1) I can set "state.checkpoints.dir" in flink-conf.yaml to get
externalized checkpoints to work all right, but how do I achieve same thing
when I run flink from IDE? I tried the GlobalConfiguration approach
mentioned in (
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/state-checkpoints-dir-td17921.html)
but no luck.

This is how I did it:

Configuration cfg =
GlobalConfiguration.loadConfiguration();
cfg.setString("state.checkpoints.dir", "file:///tmp/checkpoints/state");
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

and this is the error msg show in IDE:

Exception in thread "main"
org.apache.flink.runtime.client.JobExecutionException: Failed to submit job
ef7050e2308a4787d983d80f3c07f55c (Long Taxi Rides (checkpointed))
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1325)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:447)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.IllegalStateException: CheckpointConfig says to
persist periodic checkpoints, but no checkpoint directory has been
configured. You can configure configure one via key 'state.checkpoints.dir'.
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.(CheckpointCoordinator.java:211)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.enableCheckpointing(ExecutionGraph.java:478)
at
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:291)
at org.apache.flink.runtime.jobmanager.JobManager.org
$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1277)
... 19 more

Process finished with exit code 1

(Q2) In the checkpoint's document (
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/checkpointing.html),
it says "This way, you will have a checkpoint around to resume from if your
job fails.", how about the cancelled jobs? will the new job carry on with
the existing checkpoint or it will start with its own checkpoint?


This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275


Anyway to read Cassandra as DataStream/DataSet in Flink?

2018-03-29 Thread James Yu
Hi,

I tried to treat Cassandra as the source of data in Flink with the
information provided in the following links:
-
https://stackoverflow.com/questions/43067681/read-data-from-cassandra-for-processing-in-flink
-
https://www.javatips.net/api/flink-master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java

I got the AsyncWaitOperator exception when I run the task. According the
the 1st link, this exception occurs due to network problem. However, the
strange thing is that I am running Cassandra on my local VM with only 10
rows of data in the target table.

@Jicaar in 1st link also mentions that switching from RichAsyncFunction to
RichMapFunction can avoid the AsyncWaitOperator exception, can someone with
similar experience share how to do it in RichMapFunction?

AsyncWaitOperator exception trace -->
02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait
operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO
org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> async
wait operator -> (Flat Map, Sink: Unnamed) (1/1)
(2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception.
Failing the AsyncWaitOperator.
  at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 2 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException:
java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc
(org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader)
contextClassLoader (java.lang.Thread)
threads (java.lang.ThreadGroup)
groups (java.lang.ThreadGroup)
threadGroup (io.netty.util.concurrent.DefaultThreadFactory)
val$backingThreadFactory
(com.google.common.util.concurrent.ThreadFactoryBuilder$1)
threadFactory (java.util.concurrent.ThreadPoolExecutor)
delegate
(com.google.common.util.concurrent.MoreExecutors$ListeningDecorator)
blockingExecutor (com.datastax.driver.core.Cluster$Manager)
manager (com.datastax.driver.core.Host)
triedHosts (com.datastax.driver.core.ExecutionInfo)
info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage)
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializ

Re: Example PopularPlacesFromKafka fails to run

2018-03-23 Thread James Yu
Hi,

When I proceed further to timePrediction exercise (
http://training.data-artisans.com/exercises/timePrediction.html), I realize
that the nycTaxiRides.gz's format is fine.
The problem is in TaxiRide.toString(), the columns were serialized in wrong
order. Hence the data persisted in Kafka has wrong format.
Therefore I change TaxiRide.toString() to the following:

  public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(rideId).append(",");
sb.append(isStart ? "START" : "END").append(",");
sb.append(startTime.toString(timeFormatter)).append(",");
sb.append(endTime.toString(timeFormatter)).append(",");
sb.append(startLon).append(",");
sb.append(startLat).append(",");
sb.append(endLon).append(",");
sb.append(endLat).append(",");
sb.append(passengerCnt).append(",");
sb.append(taxiId).append(",");
sb.append(driverId);

return sb.toString();
  }



This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275

2018-03-23 9:59 GMT+08:00 James Yu :

> Just figured out the data format in nycTaxiRides.gz doesn't match to the
> way TaxiRide.java interpreting the lines fed into it.
> Then I check the exercise training github and found the TaxiRide.java (
> https://github.com/dataArtisans/flink-training-exercises/tree/master/src/
> main/java/com/dataartisans/flinktraining/exercises/
> datastream_java/datatypes) was recently updated (like 11 days ago).
> After making some changes to TaxiRide.java, the example works like a charm.
>
> I got the nycTaxiRides.gz by issuing this line in console:
> wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz
>
> Following is the changes I made to TaxiRide.java locally (basically just
> the index to variable tokens):
> try {
> ride.rideId = Long.parseLong(tokens[0]);
>
> switch (tokens[3]) {
> case "START":
> ride.isStart = true;
> ride.startTime = DateTime.parse(tokens[4], timeFormatter);
> ride.endTime = DateTime.parse(tokens[5], timeFormatter);
> break;
> case "END":
> ride.isStart = false;
> ride.endTime = DateTime.parse(tokens[4], timeFormatter);
> ride.startTime = DateTime.parse(tokens[5], timeFormatter);
> break;
> default:
> throw new RuntimeException("Invalid record: " + line);
> }
>
> ride.startLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) :
> 0.0f;
> ride.startLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) :
> 0.0f;
> ride.endLon = tokens[8].length() > 0 ? Float.parseFloat(tokens[8]) : 0.0f;
> ride.endLat = tokens[9].length() > 0 ? Float.parseFloat(tokens[9]) : 0.0f;
> ride.passengerCnt = Short.parseShort(tokens[10]);
> ride.taxiId = Long.parseLong(tokens[1]);
> ride.driverId = Long.parseLong(tokens[2]);
>
> } catch (NumberFormatException nfe) {
> throw new RuntimeException("Invalid record: " + line, nfe);
> }
>
>
> This is a UTF-8 formatted mail
> ---
> James C.-C.Yu
> +886988713275 <+886%20988%20713%20275>
>
> 2018-03-23 8:06 GMT+08:00 James Yu :
>
>> Hi,
>>
>> I fail to run the PopularPlacesFromKafka example with the following
>> exception, and I wonder what might cause this "Invalid record" error?
>>
>> when running within Intellij IDEA -->
>> 07:52:23.960 [Source: Custom Source -> Map (7/8)] INFO
>>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
>> Map (7/8) (930e95aac65cbda39d9f1eaa41891253) switched from RUNNING to
>> FAILED.
>> java.lang.RuntimeException: Invalid record: 4010,2013003778
>> <(201)%20300-3778>,2013003775 <(201)%20300-3775>,START,2013-01-01
>> 00:13:00,1970-01-01 00:00:00,-74.00074,40.7359,-73.98559,40.739063,1
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> datatypes.TaxiRide.fromString(TaxiRide.java:119)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at com.dataartisans.flinktraining.exercises.datastream_java.
>> utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at org.apache.flink.streaming.util.serialization.KeyedDeseriali
>> zationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
>> ~[flink-training-exercises-0.15.1.jar:na]
>> at org.apache.flink.streaming.connectors.kafka.internal.Kafka09

Re: Example PopularPlacesFromKafka fails to run

2018-03-22 Thread James Yu
Just figured out the data format in nycTaxiRides.gz doesn't match to the
way TaxiRide.java interpreting the lines fed into it.
Then I check the exercise training github and found the TaxiRide.java (
https://github.com/dataArtisans/flink-training-exercises/tree/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/datatypes)
was recently updated (like 11 days ago).
After making some changes to TaxiRide.java, the example works like a charm.

I got the nycTaxiRides.gz by issuing this line in console:
wget http://training.data-artisans.com/trainingData/nycTaxiRides.gz

Following is the changes I made to TaxiRide.java locally (basically just
the index to variable tokens):
try {
ride.rideId = Long.parseLong(tokens[0]);

switch (tokens[3]) {
case "START":
ride.isStart = true;
ride.startTime = DateTime.parse(tokens[4], timeFormatter);
ride.endTime = DateTime.parse(tokens[5], timeFormatter);
break;
case "END":
ride.isStart = false;
ride.endTime = DateTime.parse(tokens[4], timeFormatter);
ride.startTime = DateTime.parse(tokens[5], timeFormatter);
break;
default:
throw new RuntimeException("Invalid record: " + line);
}

ride.startLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
ride.startLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : 0.0f;
ride.endLon = tokens[8].length() > 0 ? Float.parseFloat(tokens[8]) : 0.0f;
ride.endLat = tokens[9].length() > 0 ? Float.parseFloat(tokens[9]) : 0.0f;
ride.passengerCnt = Short.parseShort(tokens[10]);
ride.taxiId = Long.parseLong(tokens[1]);
ride.driverId = Long.parseLong(tokens[2]);

} catch (NumberFormatException nfe) {
throw new RuntimeException("Invalid record: " + line, nfe);
}


This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275

2018-03-23 8:06 GMT+08:00 James Yu :

> Hi,
>
> I fail to run the PopularPlacesFromKafka example with the following
> exception, and I wonder what might cause this "Invalid record" error?
>
> when running within Intellij IDEA -->
> 07:52:23.960 [Source: Custom Source -> Map (7/8)] INFO
>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
> Map (7/8) (930e95aac65cbda39d9f1eaa41891253) switched from RUNNING to
> FAILED.
> java.lang.RuntimeException: Invalid record: 4010,2013003778
> <(201)%20300-3778>,2013003775 <(201)%20300-3775>,START,2013-01-01
> 00:13:00,1970-01-01 00:00:00,-74.00074,40.7359,-73.98559,40.739063,1
> at com.dataartisans.flinktraining.exercises.datastream_java.datatypes.
> TaxiRide.fromString(TaxiRide.java:119) ~[flink-training-exercises-0.
> 15.1.jar:na]
> at com.dataartisans.flinktraining.exercises.datastream_java.utils.
> TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
> ~[flink-training-exercises-0.15.1.jar:na]
> at com.dataartisans.flinktraining.exercises.datastream_java.utils.
> TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
> ~[flink-training-exercises-0.15.1.jar:na]
> at org.apache.flink.streaming.util.serialization.
> KeyedDeserializationSchemaWrapper.deserialize(
> KeyedDeserializationSchemaWrapper.java:42) ~[flink-training-exercises-0.
> 15.1.jar:na]
> at org.apache.flink.streaming.connectors.kafka.internal.
> Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
> ~[flink-training-exercises-0.15.1.jar:na]
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(
> FlinkKafkaConsumerBase.java:652) ~[flink-training-exercises-0.15.1.jar:na]
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> ~[flink-runtime_2.11-1.4.2.jar:1.4.2]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
>
> when deploy to and run on local cluster -->
> 2018-03-23 07:27:23.130 [Source: Custom Source -> Map (1/1)] INFO
>  org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
> Map (1/1) (db21c7604b94968097d4be7b8558ac08) switched from RUNNING to
> FAILED.
> java.lang.RuntimeException: Invalid record: 2264,2013002216
> <(201)%20300-2216>,2013002213 <(201)%20300-2213>,START,2013-01-01
> 00:09:00,1970-01-01 00:00:00,-74.00402,40.742107,-73.98032,40.73522,1
> at com.dataartisans.flinktraining.exercises.datastream_jav

Example PopularPlacesFromKafka fails to run

2018-03-22 Thread James Yu
Hi,

I fail to run the PopularPlacesFromKafka example with the following
exception, and I wonder what might cause this "Invalid record" error?

when running within Intellij IDEA -->
07:52:23.960 [Source: Custom Source -> Map (7/8)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> Map
(7/8) (930e95aac65cbda39d9f1eaa41891253) switched from RUNNING to FAILED.
java.lang.RuntimeException: Invalid record:
4010,2013003778,2013003775,START,2013-01-01 00:13:00,1970-01-01
00:00:00,-74.00074,40.7359,-73.98559,40.739063,1
at
com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide.fromString(TaxiRide.java:119)
~[flink-training-exercises-0.15.1.jar:na]
at
com.dataartisans.flinktraining.exercises.datastream_java.utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
~[flink-training-exercises-0.15.1.jar:na]
at
com.dataartisans.flinktraining.exercises.datastream_java.utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
~[flink-training-exercises-0.15.1.jar:na]
at
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
~[flink-training-exercises-0.15.1.jar:na]
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
~[flink-training-exercises-0.15.1.jar:na]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:652)
~[flink-training-exercises-0.15.1.jar:na]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
~[flink-runtime_2.11-1.4.2.jar:1.4.2]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]

when deploy to and run on local cluster -->
2018-03-23 07:27:23.130 [Source: Custom Source -> Map (1/1)] INFO
 org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> Map
(1/1) (db21c7604b94968097d4be7b8558ac08) switched from RUNNING to FAILED.
java.lang.RuntimeException: Invalid record:
2264,2013002216,2013002213,START,2013-01-01 00:09:00,1970-01-01
00:00:00,-74.00402,40.742107,-73.98032,40.73522,1
at
com.dataartisans.flinktraining.exercises.datastream_java.datatypes.TaxiRide.fromString(TaxiRide.java:119)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
com.dataartisans.flinktraining.exercises.datastream_java.utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:37)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
com.dataartisans.flinktraining.exercises.datastream_java.utils.TaxiRideSchema.deserialize(TaxiRideSchema.java:28)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:652)
~[blob_p-bc9bbda0acf1e77543b16e023cbf2466bbe42a5d-ac2bf6479faa39e6b27477bb80f66178:na]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
~[flink-dist_2.11-1.4.2.jar:1.4.2]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]

I copied the PopularPlacesFromKafka.java from
https://raw.githubusercontent.com/dataArtisans/flink-training-exercises/master/src/main/java/com/dataartisans/flinktraining/exercises/datastream_java/connectors/PopularPlacesFromKafka.java


This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275


how does SQL mode work with PopularPlaces example?

2018-03-21 Thread James Yu
Hi,

I am following the PopularPlacesSQL example (
http://training.data-artisans.com/exercises/popularPlacesSql.html), but I
am unable to understand why the following statement will pickup events with
START flag only.

"SELECT " +
"toCoords(cell), wstart, wend, isStart, popCnt " +
"FROM " +
"(SELECT " +
"cell, " +
"isStart, " +
"HOP_START(eventTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS wstart,
" +
"HOP_END(eventTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS wend, " +
"COUNT(isStart) AS popCnt " +
"FROM " +
"(SELECT " +
"eventTime, " +
"isStart, " +
"CASE WHEN isStart THEN toCellId(startLon, startLat) ELSE toCellId(endLon,
endLat) END AS cell " +
"FROM TaxiRides " +
"WHERE isInNYC(startLon, startLat) AND isInNYC(endLon, endLat)) " +
"GROUP BY cell, isStart, HOP(eventTime, INTERVAL '5' MINUTE, INTERVAL '15'
MINUTE)) " +
"WHERE popCnt > 20"

Since we can update state in processElement when we do it with low level
ProcessFunction, how does SQL rule out the un-paired events?


This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275


Re: unable to addSource to StreamExecutionEnvironment?

2018-03-20 Thread James Yu
Just found out that IDE seems auto import wrong class.
While "org.apache.flink.streaming.api.datastream.DataStream" is required,
"org.apache.flink.streaming.api.scala.DataStream" was imported.

This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275

2018-03-20 16:55 GMT+08:00 James Yu :

> Hi,
>
> I am following the Taxi example provided on "http://training.data-
> artisans.com/exercises/taxiData.html", however, I got the following error
> message when I copy addSource line into my Intellij IDE.
>
> error message -->
> Incompatible types. Required DataStream but 'addSource' was
> inferred to DataStreamSource: no instance(s) of type variable(s) OUT
> exist so that DataStreamSource conforms to DataStream
>
> addSource line -->
> DataStream rides = env.addSource(
>   new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed));
>
> My question would be: how do we addSource in latest Flink? I am
> running flink-1.4.2.
>
>
> This is a UTF-8 formatted mail
> ---
> James C.-C.Yu
> +886988713275 <+886%20988%20713%20275>
>


unable to addSource to StreamExecutionEnvironment?

2018-03-20 Thread James Yu
Hi,

I am following the Taxi example provided on "
http://training.data-artisans.com/exercises/taxiData.html";, however, I got
the following error message when I copy addSource line into my Intellij IDE.

error message -->
Incompatible types. Required DataStream but 'addSource' was
inferred to DataStreamSource: no instance(s) of type variable(s) OUT
exist so that DataStreamSource conforms to DataStream

addSource line -->
DataStream rides = env.addSource(
  new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed));

My question would be: how do we addSource in latest Flink? I am
running flink-1.4.2.


This is a UTF-8 formatted mail
---
James C.-C.Yu
+886988713275