Re: Beam Application run on cluster setup (Kafka+Flink)

2017-10-12 Thread Shankara
Hi,

I mean same code works fine in flink local setup. I can able to see
"Received Message  from testkafka Topic : " on console when kafka
receive some message (Kafka Producer is in other machine and sending some
message frequently to testkafka topic).
 
 *Submitted the Beam application to flink local by below command :*
 mvn compile exec:java
-Dexec.mainClass=org.apache.beam.influxdb.KafkaRead  -Pflink-runner

 *Output is :*
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#735957608]
with leader session id d97c060d-bdf9-4215-8d7c-138f13cbff1e.
10/13/2017 11:09:09 Job execution switched to status RUNNING.
10/13/2017 11:09:09 Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
10/13/2017 11:09:09 Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
10/13/2017 11:09:09 Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 
*Received in Deserilize..
Received Message  from testkafka Topic : HELLOASA*



If I run same code in Flink Cluster I cannot see any message in
log/stdout, But job is continuously running and Kafka Producer is in other
machine and sending some message frequently to testkafka topic.

  * I started flink cluster by below command : *
   bin/start-cluster.sh

   *Submitted the Beam application to flink cluster by below command :*
  bin/flink run -c org.apache.beam.influxdb.KafkaRead
/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar
--runner=FlinkRunner --flinkMaster=192.168.1.116
--filesToStage=/home/root1/NAI/US_Working/NAI_KPI/Kafka_Proto_Sub/target/kafka-proto-sub-bundled-2.1.0.jar


   In dashboad :

  

 



I cannot see any message in dashboard :

 

 


   As per log Job execution is running :
Cluster configuration: Standalone cluster with JobManager at
/192.168.1.116:6123
Using address 192.168.1.116:6123 to connect to JobManager.
JobManager web interface address http://192.168.1.116:8081
Starting execution of program
Submitting job with JobID: 8d731f801d00268f951a98d093f21e0c. Waiting for job
completion.
Connected to JobManager at
Actor[akka.tcp://flink@192.168.1.116:6123/user/jobmanager#422012792] with
leader session id ----.
10/13/2017 11:10:57 Job execution switched to status RUNNING.
10/13/2017 11:10:57 Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to SCHEDULED 
10/13/2017 11:10:57 Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to DEPLOYING 
10/13/2017 11:11:05 Source: Read(UnboundedKafkaSource) -> Flat Map ->
ParMultiDo(Anonymous) -> ParMultiDo(PrintMsg)(1/1) switched to RUNNING 

   There is no exception in log. I suspect deployment of kafka having issue. 

Can you please help me to check it.




 public static void main(String[] args) { 
Pipeline p = initializePipeline(args); 
Map> intelliOmIms = new TreeMap<>(); 
 
PTransform>>
reader; 
reader = KafkaIO.read() 
   .withBootstrapServers("192.168.1.116:9092")--->Kafka 
 zookeeper and server running 
.withTopic("kafkatest") 
.withKeyDeserializer(IntegerDeserializer.class) 
.withValueDeserializer(IntelliOmImsKpiDataUtil.class) 
.withoutMetadata(); 
 
PCollection> output = p.apply(reader); 
output.apply(ParDo.of(new PrintMsg())); 
 
p.run().waitUntilFinish(); 
 } 

public static class PrintMsg extends DoFn, Void> {

@ProcessElement
public void processElement(ProcessContext c) {

try {
System.out.println("Received Message  from testkafka
Topic : " + new String(c.element().getValue(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Timer coalescing necessary?

2017-10-12 Thread Kien Truong
Hi,

We are having a streaming job where we use timers to implement key timeout for 
stateful functions. Should we implement coalescing logic to reduce the number 
of timer trigger, or it is not necessary with Flink?

Best regards,
Kien

Re: Writing an Integration test for flink-metrics

2017-10-12 Thread Colin Williams
Team wants an integration test, I'm not sure what unit test you had in
mind. Actually feel that I've been trying to avoid the reporter method but
that would be more end to end.

The documentation for metrics and Scala are missing with the exception of
Gauge:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html
. Should I file a issue against that?

Then it leaves you guessing a little bit how to implement Counters. One
approach tried was using objects

object PointFilter extends RichMapFunction[...

  @transient lazy val someCounter =
getRuntimeContext.getMetricGroup.counter(...)


This allowed access to the counter before and after execution . However
between the unit tests the Counter kept its value also and that's a no for
the test. Think that might be an issue with ScalaTest.

I've tried to get at the counter from some other directions like trying to
find a way to inject a reporter to get it's state. But don't see a way to
do it. So probably the best thing to do is fire up something to collect the
metrics from the reporter.

On Thu, Oct 12, 2017 at 5:29 AM, Chesnay Schepler 
wrote:

> Well damn, i should've read the second part of the initial mail.
>
> I'm wondering though, could you not unit-test this behavior?
>
>
> On 12.10.2017 14:25, Chesnay Schepler wrote:
>
>> You could also write a custom reporter that opens a socket or similar for
>> communication purposes.
>>
>> You can then either query it for the metrics, or even just trigger the
>> verification in the reporter,
>> and fail with an error if the reporter returns an error.
>>
>> On 12.10.2017 14:02, Piotr Nowojski wrote:
>>
>>> Hi,
>>>
>>> Doing as you proposed using JMXReporter (or custom reporter) should
>>> work. I think there is no easier way to do this at the moment.
>>>
>>> Piotrek
>>>
>>> On 12 Oct 2017, at 04:58, Colin Williams  wrote:

 I have a RichMapFunction and I'd like to ensure Meter fields are
 properly incremented. I've been trying to think of the best way to do this.
 Currently I think that I'd need to either implement my own reporter (or use
 JMX) and write to a socket, create a listener and wait for the reporter to
 send the message.

 Is this a good approach for writing the test, or should I be
 considering something else?

>>>
>>>
>>
>>
>


Re: Task Manager was lost/killed due to full GC

2017-10-12 Thread ShB
On further investigation, seems to me the I/O exception I posted previously
is not the cause of the TM being lost. it's the after effect of the TM being
shut down and the channel being closed after a record is emitted but before
it's processed.

Previously, the logs didn't throw up this error and I'm also unable to
reproduce it each time(I've come across the I/O exception twice so far).
Most of the time, the logs don't have the I/O or any other exception/error
messages. 

This is what the logs usually(without the I/O exception) look like:
Job Manager:
/
2017-10-12 22:22:41,857 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
   
- Container container_1507845873691_0001_01_08 failed. Exit status: -100
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
   
- Diagnostics for container container_1507845873691_0001_01_08 in state
COMPLETE : exitStatus=-100 diagnostics=Container released on a *lost* node
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
   
- Total number of failed containers so far: 1
2017-10-12 22:22:41,858 INFO  org.apache.flink.yarn.YarnFlinkResourceManager
   
- Requesting new TaskManager container with 22000 megabytes memory. Pending
requests: 1
2017-10-12 22:22:42,096 INFO  org.apache.flink.yarn.YarnJobManager  
   
- Task manager akka.tcp://flink@ip-172-31-43-115:43404/user/taskmanager
terminated.
2017-10-12 22:22:42,210 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- CHAIN
Partition -> FlatMap (FlatMap at main(FindOutput.java:83)) -> Filter (Filter
at main(FindOutput.java:85)) (39/96) (530ca4789a921cab363f241176dac7a8)
switched from RUNNING to FAILED.
java.lang.Exception: TaskManager was lost/killed:
container_1507845873691_0001_01_08 @
ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747)
at
org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
at
org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:533)
at
org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
at 
org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
at
org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
at
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1228)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:474)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at
org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:100)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at
org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:103)
at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
at
org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:38)
at
scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
at 
org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at
org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at
org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:125)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2017-10-12 22:22:42,451 INFO 
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Output
(0c45ba62b56fefd1c1e7bfd68923411d) switched from state RUNNING to FAILING.
java.lang.Exception: TaskManager was lost/killed:
container_1507845873691_0001_01_08 @
ip-172-31-43-115.us-west-2.compute.internal (dataPort=40747)
at

Re: Task Manager was lost/killed due to full GC

2017-10-12 Thread ShB
Hi Stephan,

Apologies, I hit send too soon on the last email. 

So, while trying to debug this, I ran it multiple times on different
instance types(to increase RAM available) and while digging into the logs, I
found this to be the error in the task manager logs:

/
java.lang.RuntimeException: Emitting the record caused an I/O exception: I/O
channel already closed. Could not fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@12b3c49e
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:69)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:51)
at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:104)
at
org.processing.core.ProcessingJob$ReadFiles.flatMap(ProcessingJob.java:89)
at
org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
at
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:90)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:490)
at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:355)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: I/O channel already closed. Could not
fulfill:
org.apache.flink.runtime.io.disk.iomanager.BufferWriteRequest@12b3c49e
at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.addRequest(AsynchronousFileIOChannel.java:249)
at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:36)
at
org.apache.flink.runtime.io.disk.iomanager.AsynchronousBufferFileWriter.writeBlock(AsynchronousBufferFileWriter.java:26)
at
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:111)
at
org.apache.flink.runtime.io.network.partition.ResultPartition.add(ResultPartition.java:278)
at
org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter.writeBuffer(ResultPartitionWriter.java:72)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.writeAndClearBuffer(RecordWriter.java:223)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:121)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:89)
at
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
... 13 more
/

Any idea on a fix for this issue? I can't seem to find any further
information on this in the mailing lists.

Thank you.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Task Manager was lost/killed due to full GC

2017-10-12 Thread ShB
Hi Stephan,

Thanks for your response!

Task manager lost/killed has been a recurring problem I've had with Flink
for the last few months, as I try to scale to larger and larger amounts of
data. I would be very grateful for some help figuring out how I can avoid
this. 

The program is set up something like this:
/
DataSet data = env.fromCollection(listOfFiles)
.rebalance()
.flatMap(new ReadFiles())   
.filter(new FilterData());

DataSet computation1 = data
.map(new Compute1())
.distinct()
.map(new Compute2())
.groupBy(0, 1, 2)
.aggregate(SUM, 3).and(SUM, 4).and(SUM, 5);

Dataset computation2 = data
.map(new Compute3())
.distinct()
.map(new Compute4())
.groupBy(0, 1, 2)
.aggregate(SUM, 3).and(SUM, 4).and(SUM, 5);

Dataset finalOP = computation1.join(computation2)
.where(0, 1)
.equalTo(0, 1)
.with(new Join1())
.sortPartition(0, 
Order.ASCENDING)
.setParallelism(1);

finalOP.writeAsCsv("s3://myBucket/myKey.csv");

---

public static final class ReadFiles implements FlatMapFunction {
@Override
public void flatMap(String fileName, Collector out) 
throws
Exception {

S3FileReaderAndParser parser = new 
S3FileReaderAndParser(fileName);
List dataList = parser.parseFiles();
for (CustomType data : dataList) {
out.collect(data);
}
}
}
/

Task Manager is killed/lost during the ReadFiles() flatmap. ReadFiles is a
flatmap function that reads each of the files from S3 using the AWS S3 Java
SDK and parses and emits each of the protobufs. 

And yes, I can find a message like this in the logs about "gated" systems:
2017-10-12 20:46:00,355 WARN  akka.remote.ReliableDeliverySupervisor
   
- Association with remote system [akka.tcp://flink@ip-172-31-8-29:38763] has
failed, address is now gated for [5000] ms. Reason: [Association failed with
[akka.tcp://flink@ip-172-31-8-29:38763]] Caused by: [Connection refused:
ip-172-31-8-29/172.31.8.29:38763]

Thank you!




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-10-12 Thread Aljoscha Krettek
I think I finally found the problem, there was also already another bug report 
for this: https://issues.apache.org/jira/browse/FLINK-7484

> On 12. Oct 2017, at 18:22, Federico D'Ambrosio 
>  wrote:
> 
> Hi Aljoscha, 
> 
> yes, just like you're guessing, without asynchronous checkpoints, there has 
> been no crash so far.
> 
> Regards,
> Federico
> 
> 2017-10-12 18:08 GMT+02:00 Aljoscha Krettek  >:
> Hi Federico,
> 
> I'm guessing the job is still working without asynchronous watermarks? I'm 
> very eager to figure out what is actually going wrong with asynchronous 
> checkpoints.
> 
> Best,
> Aljoscha
> 
> 
>> On 2. Oct 2017, at 11:57, Federico D'Ambrosio 
>> > 
>> wrote:
>> 
>> As a followup:
>> 
>> the flink job has currently an uptime of almost 24 hours, with no checkpoint 
>> failed or restart whereas, with async snapshots, it would have already 
>> crashed 50 or so times.
>> 
>> Regards,
>> Federico
>> 
>> 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio 
>> >:
>> Thank you very much, Gordon.
>> 
>> I'll try to run the job without the asynchronous snapshots first thing.
>> 
>> As for the Event data type: it's a case class with 2 fields: a String ID and 
>> a composite case class (let's call it RealEvent) containing 3 fields of the 
>> following types: Information, which is a case class with String fields, 
>> Coordinates, a nested case class with 2 Double and InstantValues, with 3 
>> Integers and a DateTime.This DateTime field in InstantValues is the one 
>> being evalued in the maxBy (via InstantValues and RealEvent compareTo 
>> implementations, because dot notation is not working in scala as of 1.3.2, 
>> FLINK-7629 ) and that was 
>> the reason in the first place I had to register the JodaDateTimeSerializer 
>> with Kryo.
>> 
>> Regards,
>> Federico
>> 
>> 
>> 
>> 
>> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai > >:
>> Hi,
>> 
>> Thanks for the extra info, it was helpful (I’m not sure why your first logs 
>> didn’t have the full trace, though).
>> 
>> I spent some time digging through the error trace, and currently have some 
>> observations I would like to go through first:
>> 
>> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while 
>> trying to access the state and making a copy (via serialization) in the 
>> CopyOnWriteStateTable.
>> 2. The state that caused the exception seems to be the state of the reducing 
>> window function (i.e. the maxBy). The state type should be the same as the 
>> records in your `events` DataStream, which seems to be a Scala case class 
>> with some nested field that requires Kryo for serialization.
>> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying 
>> to copy that field ..
>> 
>> My current guess would perhaps be that the serializer internally used may 
>> have been incorrectly shared, which is probably why this exception happens 
>> randomly for you.
>> I recall that there were similar issues that occurred before due to the fact 
>> that some KryoSerializers aren't thread-safe and was incorrectly shared in 
>> Flink.
>> 
>> I may need some help from you to be able to look at this a bit more:
>> - Is it possible that you disable asynchronous snapshots and try running 
>> this job a bit more to see if the problem still occurs? This is mainly to 
>> eliminate my guess on whether or not there is some incorrect serializer 
>> usage in the CopyOnWriteStateTable.
>> - Could you let us know what your `events` DataStream records type case 
>> class looks like?
>> 
>> Also looping in Aljoscha and Stefan here, as they would probably have more 
>> insights in this.
>> 
>> Cheers,
>> Gordon
>> 
>> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio 
>> (federico.dambro...@smartlab.ws ) 
>> wrote:
>> 
>>> Hi Gordon,
>>> 
>>> I remembered that I had already seen this kind of exception once during the 
>>> testing of the current job and fortunately I had the complete stacktrace 
>>> still saved on my pc:
>>> 
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>> at 
>>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
>>> at 
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>>> at 
>>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>>> at 
>>> 

Re: Writing to an HDFS file from a Flink stream job

2017-10-12 Thread Isuru Suriarachchi
Thanks for all your directions. BucketingSink worked.

Isuru

On Thu, Oct 12, 2017 at 9:05 AM, Piotr Nowojski 
wrote:

> I think the issue might be that writeAsText (TextOutputFormat) doesn’t
> flush the data anywhere (only on close, which in streaming doesn’t happen).
> You would need to use custom output format, but as Aljoscha pointed out
> BucketingSink makes more sense for streaming applications.
>
> Piotrek
>
> On 12 Oct 2017, at 14:58, Aljoscha Krettek  wrote:
>
> Hi Isuru,
>
> What is the source in your job and is the job terminating at some point or
> running continuously?
>
> In general, the writeAsText()/writeAsCsv() methods should not be used
> because they don't work well in an infinite streaming job that might have
> failures and recovery. I.e. what does that mean for the file, if you have
> recovery. For writing to files you would use the BucketingSink:
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/
> connectors/filesystem_sink.html
>
> Best,
> Aljoscha
>
> On 12. Oct 2017, at 14:55, Piotr Nowojski  wrote:
>
> Hi,
>
> Maybe this is an access rights issue? Could you try to create and write to
> same file (same directory) in some other way (manually?), using the same
> user and the same machine as would Flink job do?
>
> Maybe there will be some hint in hdfs logs?
>
> Piotrek
>
> On 12 Oct 2017, at 00:19, Isuru Suriarachchi  wrote:
>
> Hi all,
>
> I'm just trying to use an HDFS file as the sink for my flink stream job. I
> use the following line to do so.
>
> stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo");
>
>
> I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should
> work with the full hdfs file name according to [1].
>
> However, it doesn't work as expected. File foo is created on hdfs. But
> that file is empty. But I don't see any error logs too on Flink side. When
> I used a normal file sink using a "file:///.." url, it works fine and
> data is there in the file.
>
> Do I need any other configuration to get his working?
>
> Thanks,
> Isuru
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.3/setup/config.html#hdfs
>
>
>
>
>


Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-10-12 Thread Federico D'Ambrosio
Hi Aljoscha,

yes, just like you're guessing, without asynchronous checkpoints, there has
been no crash so far.

Regards,
Federico

2017-10-12 18:08 GMT+02:00 Aljoscha Krettek :

> Hi Federico,
>
> I'm guessing the job is still working without asynchronous watermarks? I'm
> very eager to figure out what is actually going wrong with asynchronous
> checkpoints.
>
> Best,
> Aljoscha
>
>
> On 2. Oct 2017, at 11:57, Federico D'Ambrosio <
> federico.dambro...@smartlab.ws> wrote:
>
> As a followup:
>
> the flink job has currently an uptime of almost 24 hours, with no
> checkpoint failed or restart whereas, with async snapshots, it would have
> already crashed 50 or so times.
>
> Regards,
> Federico
>
> 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio <
> federico.dambro...@smartlab.ws>:
>
>> Thank you very much, Gordon.
>>
>> I'll try to run the job without the asynchronous snapshots first thing.
>>
>> As for the Event data type: it's a case class with 2 fields: a String ID
>> and a composite case class (let's call it RealEvent) containing 3 fields of
>> the following types: Information, which is a case class with String fields,
>> Coordinates, a nested case class with 2 Double and InstantValues, with 3
>> Integers and a DateTime.This DateTime field in InstantValues is the one
>> being evalued in the maxBy (via InstantValues and RealEvent compareTo
>> implementations, because dot notation is not working in scala as of 1.3.2,
>> FLINK-7629 ) and that
>> was the reason in the first place I had to register the
>> JodaDateTimeSerializer with Kryo.
>>
>> Regards,
>> Federico
>>
>>
>>
>>
>> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai :
>>
>>> Hi,
>>>
>>> Thanks for the extra info, it was helpful (I’m not sure why your first
>>> logs didn’t have the full trace, though).
>>>
>>> I spent some time digging through the error trace, and currently have
>>> some observations I would like to go through first:
>>>
>>> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while
>>> trying to access the state and making a copy (via serialization) in the
>>> CopyOnWriteStateTable.
>>> 2. The state that caused the exception seems to be the state of the
>>> reducing window function (i.e. the maxBy). The state type should be the
>>> same as the records in your `events` DataStream, which seems to be a Scala
>>> case class with some nested field that requires Kryo for serialization.
>>> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when
>>> trying to copy that field ..
>>>
>>> My current guess would perhaps be that the serializer internally used
>>> may have been incorrectly shared, which is probably why this exception
>>> happens randomly for you.
>>> I recall that there were similar issues that occurred before due to the
>>> fact that some KryoSerializers aren't thread-safe and was incorrectly
>>> shared in Flink.
>>>
>>> I may need some help from you to be able to look at this a bit more:
>>> - Is it possible that you disable asynchronous snapshots and try running
>>> this job a bit more to see if the problem still occurs? This is mainly to
>>> eliminate my guess on whether or not there is some incorrect serializer
>>> usage in the CopyOnWriteStateTable.
>>> - Could you let us know what your `events` DataStream records type case
>>> class looks like?
>>>
>>> Also looping in Aljoscha and Stefan here, as they would probably have
>>> more insights in this.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio (
>>> federico.dambro...@smartlab.ws) wrote:
>>>
>>> Hi Gordon,
>>>
>>> I remembered that I had already seen this kind of exception once during
>>> the testing of the current job and fortunately I had the complete
>>> stacktrace still saved on my pc:
>>>
>>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:15
>>> 7)
>>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>>> at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSeriali
>>> zer.copy(KryoSerializer.java:176)
>>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:101)
>>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:32)
>>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:101)
>>> at org.apache.flink.api.scala.typeutils.CaseClassSerializer.cop
>>> y(CaseClassSerializer.scala:32)
>>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge
>>> t(CopyOnWriteStateTable.java:279)
>>> at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.ge
>>> t(CopyOnWriteStateTable.java:296)
>>> at 

Re: ArrayIndexOutOfBoundExceptions while processing valve output watermark and while applying ReduceFunction in reducing state

2017-10-12 Thread Aljoscha Krettek
Hi Federico,

I'm guessing the job is still working without asynchronous watermarks? I'm very 
eager to figure out what is actually going wrong with asynchronous checkpoints.

Best,
Aljoscha


> On 2. Oct 2017, at 11:57, Federico D'Ambrosio 
>  wrote:
> 
> As a followup:
> 
> the flink job has currently an uptime of almost 24 hours, with no checkpoint 
> failed or restart whereas, with async snapshots, it would have already 
> crashed 50 or so times.
> 
> Regards,
> Federico
> 
> 2017-09-30 19:01 GMT+02:00 Federico D'Ambrosio 
> >:
> Thank you very much, Gordon.
> 
> I'll try to run the job without the asynchronous snapshots first thing.
> 
> As for the Event data type: it's a case class with 2 fields: a String ID and 
> a composite case class (let's call it RealEvent) containing 3 fields of the 
> following types: Information, which is a case class with String fields, 
> Coordinates, a nested case class with 2 Double and InstantValues, with 3 
> Integers and a DateTime.This DateTime field in InstantValues is the one being 
> evalued in the maxBy (via InstantValues and RealEvent compareTo 
> implementations, because dot notation is not working in scala as of 1.3.2, 
> FLINK-7629 ) and that was 
> the reason in the first place I had to register the JodaDateTimeSerializer 
> with Kryo.
> 
> Regards,
> Federico
> 
> 
> 
> 
> 2017-09-30 18:08 GMT+02:00 Tzu-Li (Gordon) Tai  >:
> Hi,
> 
> Thanks for the extra info, it was helpful (I’m not sure why your first logs 
> didn’t have the full trace, though).
> 
> I spent some time digging through the error trace, and currently have some 
> observations I would like to go through first:
> 
> 1. So it seems like the ArrayIndexOutOfBoundsException was thrown while 
> trying to access the state and making a copy (via serialization) in the 
> CopyOnWriteStateTable.
> 2. The state that caused the exception seems to be the state of the reducing 
> window function (i.e. the maxBy). The state type should be the same as the 
> records in your `events` DataStream, which seems to be a Scala case class 
> with some nested field that requires Kryo for serialization.
> 3. Somehow Kryo failed with the ArrayIndexOutOfBoundsException when trying to 
> copy that field ..
> 
> My current guess would perhaps be that the serializer internally used may 
> have been incorrectly shared, which is probably why this exception happens 
> randomly for you.
> I recall that there were similar issues that occurred before due to the fact 
> that some KryoSerializers aren't thread-safe and was incorrectly shared in 
> Flink.
> 
> I may need some help from you to be able to look at this a bit more:
> - Is it possible that you disable asynchronous snapshots and try running this 
> job a bit more to see if the problem still occurs? This is mainly to 
> eliminate my guess on whether or not there is some incorrect serializer usage 
> in the CopyOnWriteStateTable.
> - Could you let us know what your `events` DataStream records type case class 
> looks like?
> 
> Also looping in Aljoscha and Stefan here, as they would probably have more 
> insights in this.
> 
> Cheers,
> Gordon
> 
> On 30 September 2017 at 10:56:33 AM, Federico D'Ambrosio 
> (federico.dambro...@smartlab.ws ) 
> wrote:
> 
>> Hi Gordon,
>> 
>> I remembered that I had already seen this kind of exception once during the 
>> testing of the current job and fortunately I had the complete stacktrace 
>> still saved on my pc:
>> 
>> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>> at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>> at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>> at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:863)
>> at 
>> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:176)
>> at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>> at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>> at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
>> at 
>> org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>> at 
>> org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>> at 
>> org.apache.flink.runtime.state.heap.HeapReducingState.get(HeapReducingState.java:68)
>> at 
>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:498)
>> at 
>> 

Re: Best way to setup different log files for distinct jobs

2017-10-12 Thread r. r.
what if you have 'dedicated' task managers for each job?
so if you have 2 TMs each with 1 task slot and two jobs with -p1 then each job 
will go to the respective TM, I think?
hence - each job in its own (TM's) log

I'm new to Flink, hope it make sense








 > Оригинално писмо 

 >От: PedroMrChaves pedro.mr.cha...@gmail.com

 >Относно: Best way to setup different log files for distinct jobs

 >До: user@flink.apache.org

 >Изпратено на: 10.10.2017 14:29



 
> Hello,
 
> 
 
> I'm using logback as my logging framework. I would like to setup Flink so
 
> that each job outputs to a different file. Any Ideas on how could I do that? 
 
> 
 
> I am running flink in a standalone cluster with version 1.3.2.
 
> 
 
> Regards,
 
> Pedro Chaves.
 
> 
 
> 
 
> 
 
> -
 
> Best Regards,
 
> Pedro Chaves
 
> --
 
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Running flink on YARN

2017-10-12 Thread Navneeth Krishnan
Hello,

I'm running flink on AWS EMR and I would like to know how I can pass a
custom log4j properties file. I changed the log4j.properties file in flink
conf directory but it doesn't seem like the changes are reflected. Thanks.

I'm using the below command to start my flink job.
> flink run -m yarn-cluster

Regards,
Navneeth


Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-10-12 Thread Aljoscha Krettek
Still not nice, though, and it took a while to finalise discovery for 1.4. ;-)

If you need that now you might be able to back port the 1.4 consumer to 1.3.

> On 12. Oct 2017, at 17:05, Gyula Fóra  wrote:
> 
> Ok, thanks for the clarification. :)
> 
> Gyula
> 
> 
> On Thu, Oct 12, 2017, 17:04 Aljoscha Krettek  > wrote:
> It might be old but it's not forgotten, the issue I created is actually 
> marked as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0.
> 
> The issue in Kafka is about new topics/partitions not being discovered or 
> something else? That would be the expected behaviour in Flink < 1.4.0.
> 
> Best,
> Aljoscha
> 
>> On 12. Oct 2017, at 16:40, Gyula Fóra > > wrote:
>> 
>> Hey,
>> 
>> I know it's old discussion but there also seems to be a problem with the 
>> logic in the kafka source alone regarding new topics added after a 
>> checkpoint. 
>> 
>> Maybe there is a ticket for this already and I just missed it.
>> 
>> Cheers,
>> Gyula
>> 
>> Gyula Fóra > ezt írta (időpont: 
>> 2017. szept. 14., Cs, 14:53):
>> Good job for figuring this out!
>> This certainly seems to explain our problems.
>> 
>> Thanks!
>> Gyula
>> 
>> Aljoscha Krettek > ezt írta 
>> (időpont: 2017. szept. 14., Cs, 14:46):
>> After a bit more digging I found that the "isRestored" flag doesn't work 
>> correctly if there are operators chained to the sink that have state: 
>> https://issues.apache.org/jira/browse/FLINK-7623 
>> 
>> 
>> Blocker issue for 1.3.3 and 1.4.0.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 6. Sep 2017, at 16:05, Aljoscha Krettek >> > wrote:
>>> 
>>> After discussing this between Stefan and me we think that this should 
>>> actually work.
>>> 
>>> Do you have the log output from restoring the Kafka Consumer? It would be 
>>> interesting to see whether any of those print:
>>>  - 
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>>>  
>>> 
>>>  - 
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>>>  
>>> 
>>> 
 On 6. Sep 2017, at 14:45, Aljoscha Krettek > wrote:
 
 Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer 
 which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we 
 store state in a union state, i.e. all sources get all partition on 
 restore and if they didn't get any they know that they are new. There is 
 no specific logic for detecting this situation, it's just that the 
 partition discoverer will be seeded with this information and it will know 
 if it discovers a new partition whether it can take ownership of that 
 partition.
 
 I'm sure Gordon (cc'ed) could explain it better than I did.
 
> On 6. Sep 2017, at 14:36, Gyula Fóra  > wrote:
> 
> Wouldnt it be enough that Kafka sources store some empty container for 
> there state if it is empty, compared to null when it should be 
> bootstrapped again?
> 
> Gyula
> 
> Aljoscha Krettek > ezt 
> írta (időpont: 2017. szept. 6., Sze, 14:31):
> The problem here is that context.isRestored() is a global flag and not 
> local to each operator. It says "yes this job was restored" but the 
> source would need to know that it is actually brand new and never had any 
> state. This is quite tricky to do, since there is currently no way (if 
> I'm correct) to differentiate between "I got empty state but others maybe 
> got state" and "this source never had state and neither had other 
> parallel instances".
> 
> Best,
> Aljoscha
> 
>> On 6. Sep 2017, at 13:56, Stefan Richter > > wrote:
>> 
>> Thanks for the report, I will take a look.
>> 
>>> Am 06.09.2017 um 11:48 schrieb Gyula 

Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-10-12 Thread Gyula Fóra
Ok, thanks for the clarification. :)

Gyula

On Thu, Oct 12, 2017, 17:04 Aljoscha Krettek  wrote:

> It might be old but it's not forgotten, the issue I created is actually
> marked as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0.
>
> The issue in Kafka is about new topics/partitions not being discovered or
> something else? That would be the expected behaviour in Flink < 1.4.0.
>
> Best,
> Aljoscha
>
> On 12. Oct 2017, at 16:40, Gyula Fóra  wrote:
>
> Hey,
>
> I know it's old discussion but there also seems to be a problem with the
> logic in the kafka source alone regarding new topics added after a
> checkpoint.
>
> Maybe there is a ticket for this already and I just missed it.
>
> Cheers,
> Gyula
>
> Gyula Fóra  ezt írta (időpont: 2017. szept. 14., Cs,
> 14:53):
>
>> Good job for figuring this out!
>> This certainly seems to explain our problems.
>>
>> Thanks!
>> Gyula
>>
>> Aljoscha Krettek  ezt írta (időpont: 2017. szept.
>> 14., Cs, 14:46):
>>
>>> After a bit more digging I found that the "isRestored" flag doesn't work
>>> correctly if there are operators chained to the sink that have state:
>>> https://issues.apache.org/jira/browse/FLINK-7623
>>>
>>> Blocker issue for 1.3.3 and 1.4.0.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 6. Sep 2017, at 16:05, Aljoscha Krettek  wrote:
>>>
>>> After discussing this between Stefan and me we think that this should
>>> actually work.
>>>
>>> Do you have the log output from restoring the Kafka Consumer? It would
>>> be interesting to see whether any of those print:
>>>  -
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>>>  -
>>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>>>
>>> On 6. Sep 2017, at 14:45, Aljoscha Krettek  wrote:
>>>
>>> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT
>>> consumer which also has discovery of new partitions. Starting from
>>> 1.4-SNAPSHOT we store state in a union state, i.e. all sources get all
>>> partition on restore and if they didn't get any they know that they are
>>> new. There is no specific logic for detecting this situation, it's just
>>> that the partition discoverer will be seeded with this information and it
>>> will know if it discovers a new partition whether it can take ownership of
>>> that partition.
>>>
>>> I'm sure Gordon (cc'ed) could explain it better than I did.
>>>
>>> On 6. Sep 2017, at 14:36, Gyula Fóra  wrote:
>>>
>>> Wouldnt it be enough that Kafka sources store some empty container for
>>> there state if it is empty, compared to null when it should be bootstrapped
>>> again?
>>>
>>> Gyula
>>>
>>> Aljoscha Krettek  ezt írta (időpont: 2017. szept.
>>> 6., Sze, 14:31):
>>>
 The problem here is that context.isRestored() is a global flag and not
 local to each operator. It says "yes this job was restored" but the source
 would need to know that it is actually brand new and never had any state.
 This is quite tricky to do, since there is currently no way (if I'm
 correct) to differentiate between "I got empty state but others maybe got
 state" and "this source never had state and neither had other parallel
 instances".

 Best,
 Aljoscha

 On 6. Sep 2017, at 13:56, Stefan Richter 
 wrote:

 Thanks for the report, I will take a look.

 Am 06.09.2017 um 11:48 schrieb Gyula Fóra :

 Hi all,

 We are running into some problems with the kafka source after changing
 the uid and restoring from the savepoint.
 What we are expecting is to clear the partition state, and set it up
 all over again, but what seems to happen is that the consumer thinks that
 it doesnt have any partitions assigned.

 This was supposed to be fixed in
 https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
 but appears to be reworked/reverted in the latest release :
 https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547

 What is the expected behaviour here?

 Thanks!
 Gyula




>>>
>>>
>>>
>


Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-10-12 Thread Aljoscha Krettek
It might be old but it's not forgotten, the issue I created is actually marked 
as a blocker so we won't forget it when releasing 1.3.3 and 1.4.0.

The issue in Kafka is about new topics/partitions not being discovered or 
something else? That would be the expected behaviour in Flink < 1.4.0.

Best,
Aljoscha

> On 12. Oct 2017, at 16:40, Gyula Fóra  wrote:
> 
> Hey,
> 
> I know it's old discussion but there also seems to be a problem with the 
> logic in the kafka source alone regarding new topics added after a 
> checkpoint. 
> 
> Maybe there is a ticket for this already and I just missed it.
> 
> Cheers,
> Gyula
> 
> Gyula Fóra > ezt írta (időpont: 
> 2017. szept. 14., Cs, 14:53):
> Good job for figuring this out!
> This certainly seems to explain our problems.
> 
> Thanks!
> Gyula
> 
> Aljoscha Krettek > ezt írta 
> (időpont: 2017. szept. 14., Cs, 14:46):
> After a bit more digging I found that the "isRestored" flag doesn't work 
> correctly if there are operators chained to the sink that have state: 
> https://issues.apache.org/jira/browse/FLINK-7623 
> 
> 
> Blocker issue for 1.3.3 and 1.4.0.
> 
> Best,
> Aljoscha
> 
>> On 6. Sep 2017, at 16:05, Aljoscha Krettek > > wrote:
>> 
>> After discussing this between Stefan and me we think that this should 
>> actually work.
>> 
>> Do you have the log output from restoring the Kafka Consumer? It would be 
>> interesting to see whether any of those print:
>>  - 
>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>>  
>> 
>>  - 
>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>>  
>> 
>> 
>>> On 6. Sep 2017, at 14:45, Aljoscha Krettek >> > wrote:
>>> 
>>> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer 
>>> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we 
>>> store state in a union state, i.e. all sources get all partition on restore 
>>> and if they didn't get any they know that they are new. There is no 
>>> specific logic for detecting this situation, it's just that the partition 
>>> discoverer will be seeded with this information and it will know if it 
>>> discovers a new partition whether it can take ownership of that partition.
>>> 
>>> I'm sure Gordon (cc'ed) could explain it better than I did.
>>> 
 On 6. Sep 2017, at 14:36, Gyula Fóra > wrote:
 
 Wouldnt it be enough that Kafka sources store some empty container for 
 there state if it is empty, compared to null when it should be 
 bootstrapped again?
 
 Gyula
 
 Aljoscha Krettek > ezt 
 írta (időpont: 2017. szept. 6., Sze, 14:31):
 The problem here is that context.isRestored() is a global flag and not 
 local to each operator. It says "yes this job was restored" but the source 
 would need to know that it is actually brand new and never had any state. 
 This is quite tricky to do, since there is currently no way (if I'm 
 correct) to differentiate between "I got empty state but others maybe got 
 state" and "this source never had state and neither had other parallel 
 instances".
 
 Best,
 Aljoscha
 
> On 6. Sep 2017, at 13:56, Stefan Richter  > wrote:
> 
> Thanks for the report, I will take a look.
> 
>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra > >:
>> 
>> Hi all,
>> 
>> We are running into some problems with the kafka source after changing 
>> the uid and restoring from the savepoint.
>> What we are expecting is to clear the partition state, and set it up all 
>> over again, but what seems to happen is that the consumer thinks that it 
>> doesnt have any partitions assigned.
>> 
>> This was supposed to be fixed in 
>> 

Re: Submitting a job via command line

2017-10-12 Thread Piotr Nowojski
Have you tried this 
http://mail-archives.apache.org/mod_mbox/flink-user/201705.mbox/%3ccagr9p8bxhljseexwzvxlk+drotyp1yxjy4n4_qgerdzxz8u...@mail.gmail.com%3E
 

?

Piotrek

> On 12 Oct 2017, at 16:30, Alexander Smirnov  wrote:
> 
> Hello All,
>  
> I got the following error while attempting to execute a job via command line:
> 
> [root@flink01 bin]# ./flink run -c com.five9.stream.PrecomputeJob 
> /vagrant/flink-precompute-1.0-SNAPSHOT.jar -Xmx2048m -Xms2048m
> Cluster configuration: Standalone cluster with JobManager at 
> flink01.pb.lx-draskin5.five9.com/10.11.132.110:6123
> Using address flink01.pb.lx-draskin5.five9.com:6123 to connect to JobManager.
> JobManager web interface address http://flink01.pb.lx-draskin5.five9.com:8081 
> 
> Starting execution of program
> Submitting job with JobID: 222a9d44d2069ab3cc41866c8f3a. Waiting for job 
> completion.
> Connected to JobManager at 
> Actor[akka.tcp://fl...@flink01.pb.lx-draskin5.five9.com 
> :6123/user/jobmanager#-1899708478]
>  with leader session id ----.
>  
> 
> The program finished with the following exception:
>  
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Couldn't retrieve the JobExecutionResult from the 
> JobManager.
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
> at 
> org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
> at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1499)
> at com.five9.stream.PrecomputeJob.execute(PrecomputeJob.java:137)
> at 
> com.five9.stream.PrecomputeJob.configureAndExecute(PrecomputeJob.java:78)
> at com.five9.stream.PrecomputeJob.main(PrecomputeJob.java:65)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
> at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
> at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
> at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
> at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
> at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
> at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
> at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
> retrieve the JobExecutionResult from the JobManager.
> at 
> org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
> at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
> at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
> ... 25 more
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
> submission to the JobManager timed out. You may increase 
> 'akka.client.timeout' in case the JobManager needs more time to configure and 
> confirm the job submission.
> at 
> org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
> at 
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
> at 
> 

Re: Monitoring job w/LocalStreamEnvironment

2017-10-12 Thread Piotr Nowojski
Hi,

Have you read the following doc?
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/testing.html
 


There are some hints regarding testing your application. Especially take a look 
at the example with using static field to communicate with the running job.

Piotrek

> On 12 Oct 2017, at 16:33, Ken Krugler  wrote:
> 
> Hi all,
> 
> With an iteration-based workflow, it’s helpful to be able to monitor the job 
> counters and explicitly terminate when the test has completed.
> 
> I didn’t see support for async job creation, though.
> 
> So I extended LocalStreamEnvironment to add an executeAsync(), which returns 
> the LocalFlinkMiniCluster.submitJobDetached() result.
> 
> But it appears like I need to have a ClusterClient in order to actually 
> monitor this job.
> 
> And ClusterClient is bound in with a lot of CLI code, so I’m hesitant to try 
> to extract what I need.
> 
> Is there an easier/recommended approach to the above?
> 
> Thanks!
> 
> — Ken
> 
> 
> http://about.me/kkrugler
> +1 530-210-6378
> 



Re: Beam Application run on cluster setup (Kafka+Flink)

2017-10-12 Thread Piotr Nowojski
Hi,

What do you mean by:

> With standalone beam application kafka can receive the message, But in
cluster setup it is not working.

In your example you are reading the data from Kafka and printing them to 
console. There doesn’t seems to be anything that writes back to Kafka, so what 
do you mean by “Kafka can not receive the message”?

Did you check the output file of your application in the log directory? Did you 
check Flink logs if there are any errors?

Piotrek

> On 12 Oct 2017, at 15:49, Shankara  wrote:
> 
> Below is my setup 
>1. Kafka zookeeper and server in one machine (192.168.1.116) and
> producer (192.168.1.100) and consumer (192.168.1.117) in another machine.  
> --> This work fine no issue 
>2. Running standalone beam application with kafka consumer --> This
> work fine
>3. Running beam application in flink cluster with kafka consumer -->
> This is not working
>  Not receiving message from kafka producer.
> 
> Same program works fine with standalone with flink runner.
> Below is my code snippet.
> 
> public static void main(String[] args) {
>Pipeline p = initializePipeline(args);
>Map> intelliOmIms = new TreeMap<>();
> 
>PTransform>> reader;
>reader = KafkaIO.read()
>.withBootstrapServers("192.168.1.116:9092")--->Kafka
> zookeeper and server running
>.withTopic("kafkatest")
>.withKeyDeserializer(IntegerDeserializer.class)
>.withValueDeserializer(IntelliOmImsKpiDataUtil.class)
>.withoutMetadata();
> 
>PCollection> output = p.apply(reader);
>output.apply(ParDo.of(new PrintMsg()));
> 
>p.run().waitUntilFinish();
> }
> 
>  In IntelliOmImsKpiDataUtil deserializer I am just printing message saying
> that kafka is received the message.
> 
> public static class PrintMsg extends DoFn, Void> {
> 
>@ProcessElement
>public void processElement(ProcessContext c) {
>System.out.println("Received Message  from kafkatest Topic ");
>}
> }
> 
>  Started Zookeeper in 192.168.1.116 like below :
>bin/zookeeper-server-start.sh config/zookeeper.properties
> 
>  Started Server in 192.168.1.116 like below :
>bin/kafka-server-start.sh config/server.properties
> 
>  Started Producer in 192.168.1.100 like below :
>bin/kafka-console-producer.sh --broker-list 192.168.1.116:9092 --topic
> kafkatest
> 
>  Started Consumer in 192.168.1.117 like below :
>bin/kafka-console-consumer.sh --zookeeper 192.168.1.116:2181 --topic
> kafkatest --from-beginning
> 
>   With standalone beam application kafka can receive the message, But in
> cluster setup it is not working.
> 
> Can you please help me to check it. 
> 
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Empty state restore seems to be broken for Kafka source (1.3.2)

2017-10-12 Thread Gyula Fóra
Hey,

I know it's old discussion but there also seems to be a problem with the
logic in the kafka source alone regarding new topics added after a
checkpoint.

Maybe there is a ticket for this already and I just missed it.

Cheers,
Gyula

Gyula Fóra  ezt írta (időpont: 2017. szept. 14., Cs,
14:53):

> Good job for figuring this out!
> This certainly seems to explain our problems.
>
> Thanks!
> Gyula
>
> Aljoscha Krettek  ezt írta (időpont: 2017. szept.
> 14., Cs, 14:46):
>
>> After a bit more digging I found that the "isRestored" flag doesn't work
>> correctly if there are operators chained to the sink that have state:
>> https://issues.apache.org/jira/browse/FLINK-7623
>>
>> Blocker issue for 1.3.3 and 1.4.0.
>>
>> Best,
>> Aljoscha
>>
>> On 6. Sep 2017, at 16:05, Aljoscha Krettek  wrote:
>>
>> After discussing this between Stefan and me we think that this should
>> actually work.
>>
>> Do you have the log output from restoring the Kafka Consumer? It would be
>> interesting to see whether any of those print:
>>  -
>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L611
>>  -
>> https://github.com/apache/flink/blob/f1a173addd99e5df00921b924352a39810d8d180/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L554
>>
>> On 6. Sep 2017, at 14:45, Aljoscha Krettek  wrote:
>>
>> Yes, and that's essentially what's happening in the 1.4-SNAPSHOT consumer
>> which also has discovery of new partitions. Starting from 1.4-SNAPSHOT we
>> store state in a union state, i.e. all sources get all partition on restore
>> and if they didn't get any they know that they are new. There is no
>> specific logic for detecting this situation, it's just that the partition
>> discoverer will be seeded with this information and it will know if it
>> discovers a new partition whether it can take ownership of that partition.
>>
>> I'm sure Gordon (cc'ed) could explain it better than I did.
>>
>> On 6. Sep 2017, at 14:36, Gyula Fóra  wrote:
>>
>> Wouldnt it be enough that Kafka sources store some empty container for
>> there state if it is empty, compared to null when it should be bootstrapped
>> again?
>>
>> Gyula
>>
>> Aljoscha Krettek  ezt írta (időpont: 2017. szept.
>> 6., Sze, 14:31):
>>
>>> The problem here is that context.isRestored() is a global flag and not
>>> local to each operator. It says "yes this job was restored" but the source
>>> would need to know that it is actually brand new and never had any state.
>>> This is quite tricky to do, since there is currently no way (if I'm
>>> correct) to differentiate between "I got empty state but others maybe got
>>> state" and "this source never had state and neither had other parallel
>>> instances".
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 6. Sep 2017, at 13:56, Stefan Richter 
>>> wrote:
>>>
>>> Thanks for the report, I will take a look.
>>>
>>> Am 06.09.2017 um 11:48 schrieb Gyula Fóra :
>>>
>>> Hi all,
>>>
>>> We are running into some problems with the kafka source after changing
>>> the uid and restoring from the savepoint.
>>> What we are expecting is to clear the partition state, and set it up all
>>> over again, but what seems to happen is that the consumer thinks that it
>>> doesnt have any partitions assigned.
>>>
>>> This was supposed to be fixed in
>>> https://github.com/apache/flink/commit/0ecb5d0050b84ba48105836288d43ce4c4749459#diff-06bf4a7f73d98ef91309154654563475
>>> but appears to be reworked/reverted in the latest release :
>>> https://github.com/apache/flink/blob/0399beed1ea3e04d332b42cc506041d75a6148b4/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java#L547
>>>
>>> What is the expected behaviour here?
>>>
>>> Thanks!
>>> Gyula
>>>
>>>
>>>
>>>
>>
>>
>>


Monitoring job w/LocalStreamEnvironment

2017-10-12 Thread Ken Krugler
Hi all,

With an iteration-based workflow, it’s helpful to be able to monitor the job 
counters and explicitly terminate when the test has completed.

I didn’t see support for async job creation, though.

So I extended LocalStreamEnvironment to add an executeAsync(), which returns 
the LocalFlinkMiniCluster.submitJobDetached() result.

But it appears like I need to have a ClusterClient in order to actually monitor 
this job.

And ClusterClient is bound in with a lot of CLI code, so I’m hesitant to try to 
extract what I need.

Is there an easier/recommended approach to the above?

Thanks!

— Ken


http://about.me/kkrugler
+1 530-210-6378



Submitting a job via command line

2017-10-12 Thread Alexander Smirnov
Hello All,

I got the following error while attempting to execute a job via command line:

[root@flink01 bin]# ./flink run -c com.five9.stream.PrecomputeJob 
/vagrant/flink-precompute-1.0-SNAPSHOT.jar -Xmx2048m -Xms2048m
Cluster configuration: Standalone cluster with JobManager at 
flink01.pb.lx-draskin5.five9.com/10.11.132.110:6123
Using address flink01.pb.lx-draskin5.five9.com:6123 to connect to JobManager.
JobManager web interface address 
http://flink01.pb.lx-draskin5.five9.com:8081
Starting execution of program
Submitting job with JobID: 222a9d44d2069ab3cc41866c8f3a. Waiting for job 
completion.
Connected to JobManager at 
Actor[akka.tcp://fl...@flink01.pb.lx-draskin5.five9.com:6123/user/jobmanager#-1899708478]
 with leader session id ----.


The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program 
execution failed: Couldn't retrieve the JobExecutionResult from the JobManager.
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:478)
at 
org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:105)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:442)
at 
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:73)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1499)
at com.five9.stream.PrecomputeJob.execute(PrecomputeJob.java:137)
at 
com.five9.stream.PrecomputeJob.configureAndExecute(PrecomputeJob.java:78)
at com.five9.stream.PrecomputeJob.main(PrecomputeJob.java:65)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:528)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:419)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:381)
at 
org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:838)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:259)
at 
org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1086)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1133)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1130)
at 
org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1130)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Couldn't 
retrieve the JobExecutionResult from the JobManager.
at 
org.apache.flink.runtime.client.JobClient.awaitJobResult(JobClient.java:309)
at 
org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:396)
at 
org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:467)
... 25 more
Caused by: 
org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
submission to the JobManager timed out. You may increase 'akka.client.timeout' 
in case the JobManager needs more time to configure and confirm the job 
submission.
at 
org.apache.flink.runtime.client.JobSubmissionClientActor.handleCustomMessage(JobSubmissionClientActor.java:119)
at 
org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:251)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:89)
at 
org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
at 
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at 

Beam Application run on cluster setup (Kafka+Flink)

2017-10-12 Thread Shankara
Below is my setup 
1. Kafka zookeeper and server in one machine (192.168.1.116) and
producer (192.168.1.100) and consumer (192.168.1.117) in another machine.  
--> This work fine no issue 
2. Running standalone beam application with kafka consumer --> This
work fine
3. Running beam application in flink cluster with kafka consumer -->
This is not working
  Not receiving message from kafka producer.

Same program works fine with standalone with flink runner.
Below is my code snippet.

public static void main(String[] args) {
Pipeline p = initializePipeline(args);
Map> intelliOmIms = new TreeMap<>();

PTransform>> reader;
reader = KafkaIO.read()
.withBootstrapServers("192.168.1.116:9092")--->Kafka
zookeeper and server running
.withTopic("kafkatest")
.withKeyDeserializer(IntegerDeserializer.class)
.withValueDeserializer(IntelliOmImsKpiDataUtil.class)
.withoutMetadata();

PCollection> output = p.apply(reader);
output.apply(ParDo.of(new PrintMsg()));

p.run().waitUntilFinish();
}

  In IntelliOmImsKpiDataUtil deserializer I am just printing message saying
that kafka is received the message.

public static class PrintMsg extends DoFn, Void> {

@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Received Message  from kafkatest Topic ");
}
}

  Started Zookeeper in 192.168.1.116 like below :
bin/zookeeper-server-start.sh config/zookeeper.properties
  
  Started Server in 192.168.1.116 like below :
bin/kafka-server-start.sh config/server.properties
  
  Started Producer in 192.168.1.100 like below :
bin/kafka-console-producer.sh --broker-list 192.168.1.116:9092 --topic
kafkatest

  Started Consumer in 192.168.1.117 like below :
bin/kafka-console-consumer.sh --zookeeper 192.168.1.116:2181 --topic
kafkatest --from-beginning

   With standalone beam application kafka can receive the message, But in
cluster setup it is not working.

Can you please help me to check it. 




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Writing to an HDFS file from a Flink stream job

2017-10-12 Thread Piotr Nowojski
I think the issue might be that writeAsText (TextOutputFormat) doesn’t flush 
the data anywhere (only on close, which in streaming doesn’t happen). You would 
need to use custom output format, but as Aljoscha pointed out BucketingSink 
makes more sense for streaming applications.

Piotrek

> On 12 Oct 2017, at 14:58, Aljoscha Krettek  wrote:
> 
> Hi Isuru,
> 
> What is the source in your job and is the job terminating at some point or 
> running continuously?
> 
> In general, the writeAsText()/writeAsCsv() methods should not be used because 
> they don't work well in an infinite streaming job that might have failures 
> and recovery. I.e. what does that mean for the file, if you have recovery. 
> For writing to files you would use the BucketingSink: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/filesystem_sink.html
>  
> 
> 
> Best,
> Aljoscha
> 
>> On 12. Oct 2017, at 14:55, Piotr Nowojski > > wrote:
>> 
>> Hi,
>> 
>> Maybe this is an access rights issue? Could you try to create and write to 
>> same file (same directory) in some other way (manually?), using the same 
>> user and the same machine as would Flink job do?
>> 
>> Maybe there will be some hint in hdfs logs?
>> 
>> Piotrek
>> 
>>> On 12 Oct 2017, at 00:19, Isuru Suriarachchi >> > wrote:
>>> 
>>> Hi all,
>>> 
>>> I'm just trying to use an HDFS file as the sink for my flink stream job. I 
>>> use the following line to do so.
>>> 
>>> stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo 
>>> ");
>>> 
>>> I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should 
>>> work with the full hdfs file name according to [1]. 
>>> 
>>> However, it doesn't work as expected. File foo is created on hdfs. But that 
>>> file is empty. But I don't see any error logs too on Flink side. When I 
>>> used a normal file sink using a "file:/// .." url, it works fine 
>>> and data is there in the file.
>>> 
>>> Do I need any other configuration to get his working?
>>> 
>>> Thanks,
>>> Isuru
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs
>>>  
>>> 
> 



Re: Kafka 11 connector on Flink 1.3

2017-10-12 Thread Syed Moizuddin
Thanks. I want to use the exactly once Kafka producer for my use case.
Hence the question.

I will wait for the update on the timelines.

Thanks again.
Moiz

On Oct 12, 2017 5:26 PM, "Piotr Nowojski"  wrote:

> Hi,
>
> Kafka 0.11 connector depends on some API changes for Flink 1.4, so without
> rebasing the code and solving some small issues it is not possible to use
> it for 1.3.x.
>
> We are about to finalizing the timeframe for 1.4 release, it would be
> great if you could come back with this question after the weekend.
>
> If you do not need exactly-once Kafka producer that will come with Kafka
> 0.11 connector, you should be able to use 0.10 connector to read
> (exactly-once) and write (at-least-once) to Kafka 0.11.
>
> Piotrek
>
> > On 12 Oct 2017, at 10:44, Syed Moizuddin  wrote:
> >
> > Hi,
> >
> > I was just wondering if I could use the Kafka 11 connector on 1.3.
> > If there are dependencies, then what would be the timeframe for 1.4
> release
> >
> > Thanks
> > Moiz
>
>


Re: Writing to an HDFS file from a Flink stream job

2017-10-12 Thread Piotr Nowojski
Hi,

Maybe this is an access rights issue? Could you try to create and write to same 
file (same directory) in some other way (manually?), using the same user and 
the same machine as would Flink job do?

Maybe there will be some hint in hdfs logs?

Piotrek

> On 12 Oct 2017, at 00:19, Isuru Suriarachchi  wrote:
> 
> Hi all,
> 
> I'm just trying to use an HDFS file as the sink for my flink stream job. I 
> use the following line to do so.
> 
> stream.writeAsText("hdfs://hadoop-master:9000/user/isuru/foo");
> 
> I have not set "fs.hdfs.hadoopconf" in my flink configuration as it should 
> work with the full hdfs file name according to [1]. 
> 
> However, it doesn't work as expected. File foo is created on hdfs. But that 
> file is empty. But I don't see any error logs too on Flink side. When I used 
> a normal file sink using a "file:///.." url, it works fine and data is there 
> in the file.
> 
> Do I need any other configuration to get his working?
> 
> Thanks,
> Isuru
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#hdfs
>  
> 


Re: Writing an Integration test for flink-metrics

2017-10-12 Thread Chesnay Schepler

Well damn, i should've read the second part of the initial mail.

I'm wondering though, could you not unit-test this behavior?

On 12.10.2017 14:25, Chesnay Schepler wrote:
You could also write a custom reporter that opens a socket or similar 
for communication purposes.


You can then either query it for the metrics, or even just trigger the 
verification in the reporter,

and fail with an error if the reporter returns an error.

On 12.10.2017 14:02, Piotr Nowojski wrote:

Hi,

Doing as you proposed using JMXReporter (or custom reporter) should 
work. I think there is no easier way to do this at the moment.


Piotrek

On 12 Oct 2017, at 04:58, Colin Williams 
 wrote:


I have a RichMapFunction and I'd like to ensure Meter fields are 
properly incremented. I've been trying to think of the best way to 
do this. Currently I think that I'd need to either implement my own 
reporter (or use JMX) and write to a socket, create a listener and 
wait for the reporter to send the message.


Is this a good approach for writing the test, or should I be 
considering something else?









Re: Writing an Integration test for flink-metrics

2017-10-12 Thread Chesnay Schepler
You could also write a custom reporter that opens a socket or similar 
for communication purposes.


You can then either query it for the metrics, or even just trigger the 
verification in the reporter,

and fail with an error if the reporter returns an error.

On 12.10.2017 14:02, Piotr Nowojski wrote:

Hi,

Doing as you proposed using JMXReporter (or custom reporter) should work. I 
think there is no easier way to do this at the moment.

Piotrek


On 12 Oct 2017, at 04:58, Colin Williams  
wrote:

I have a RichMapFunction and I'd like to ensure Meter fields are properly 
incremented. I've been trying to think of the best way to do this. Currently I 
think that I'd need to either implement my own reporter (or use JMX) and write 
to a socket, create a listener and wait for the reporter to send the message.

Is this a good approach for writing the test, or should I be considering 
something else?






Re: Writing an Integration test for flink-metrics

2017-10-12 Thread Piotr Nowojski
Hi,

Doing as you proposed using JMXReporter (or custom reporter) should work. I 
think there is no easier way to do this at the moment.

Piotrek

> On 12 Oct 2017, at 04:58, Colin Williams  
> wrote:
> 
> I have a RichMapFunction and I'd like to ensure Meter fields are properly 
> incremented. I've been trying to think of the best way to do this. Currently 
> I think that I'd need to either implement my own reporter (or use JMX) and 
> write to a socket, create a listener and wait for the reporter to send the 
> message.
> 
> Is this a good approach for writing the test, or should I be considering 
> something else?



Re: Kafka 11 connector on Flink 1.3

2017-10-12 Thread Piotr Nowojski
Hi,

Kafka 0.11 connector depends on some API changes for Flink 1.4, so without 
rebasing the code and solving some small issues it is not possible to use it 
for 1.3.x.

We are about to finalizing the timeframe for 1.4 release, it would be great if 
you could come back with this question after the weekend.

If you do not need exactly-once Kafka producer that will come with Kafka 0.11 
connector, you should be able to use 0.10 connector to read (exactly-once) and 
write (at-least-once) to Kafka 0.11.

Piotrek

> On 12 Oct 2017, at 10:44, Syed Moizuddin  wrote:
> 
> Hi,
> 
> I was just wondering if I could use the Kafka 11 connector on 1.3.
> If there are dependencies, then what would be the timeframe for 1.4 release
> 
> Thanks
> Moiz



Re: Exception in BucketingSink when cancelling Flink job

2017-10-12 Thread Erik van Oosten
Hi Wangsan,

We were struggling with this for many days as well. In the end we found a work 
around. Well work-around, this for sure qualifies as one of the ugliest hacks I 
have ever contemplated. Our work-around for Flink immediately interrupting the 
close, is to continue closing on another thread! Here is an example in Scala:

class MyBucketingSink[A](basePath: String) extends BucketingSink[A](basePath) {

  override def close(): Unit = {
//
// Unfortunately, Flink closes very very very very eagerly. So eagerly in 
fact that it will try to kill us by
// interrupting the current thread immediately. Let's try to continue on a 
different thread :evil-grin:
//

def superClose(): Unit = super.close()

new Thread(
  new Runnable {
override def run(): Unit = {
  logger.info("Close invoked on MyBucketingSink on task " + 
getRuntimeContext.getTaskNameWithSubtasks)
  try {
superClose()
  } catch {
case e: Throwable => logger.error(e)("Failed to close task " + 
getRuntimeContext.getTaskNameWithSubtasks)
  }
}
  },
  "Closing task " + getRuntimeContext.getTaskNameWithSubtasks
).start()
  }
}

Obviously, if the close hangs, the entire job will hang and Flink will need to 
be fully restarted.
Please let us know if you see any other problems with this approach.

Kind regards,
Erik.



> Op 27 sep. 2017, om 07:33 heeft wangsan  het volgende 
> geschreven:
> 
> After digging into the source code, we found that when Flink job is canceled, 
> a TaskCanceler thread is created.
> 
> The TaskCanceler thread calls cancel() on the invokable and periodically 
> interrupts the
> task thread until it has terminated.
> 
> try {
>   invokable.cancel();
> } catch (Throwable t) {
>   logger.error("Error while canceling the task {}.", taskName, t);
> }//..executer.interrupt();try {
>   executer.join(interruptInterval);
> }catch (InterruptedException e) {  // we can ignore this}//..
> Notice that TaskCanceler first send interrupt signal to task thread, and 
> following with join method. And since the task thread is now try to close 
> DFSOutputStream, which is waiting for ack, thus InterruptedException is 
> throwed out in task thread.
> 
> synchronized (dataQueue) {while (!streamerClosed) {
>   checkClosed();  if (lastAckedSeqno >= seqno) {break;
>   }  try {
> dataQueue.wait(1000); // when we receive an ack, we notify on
> // dataQueue
>   } catch (InterruptedException ie) {throw new InterruptedIOException(
> "Interrupted while waiting for data to be acknowledged by pipeline");
>   }
> }
> I was confused why TaskCanceler call executer.interrupt() before 
> executer.join(interruptInterval). Can anyone help?
> 
> 
> 
> 
> 
> 
> Hi,
> 
> We are currently using BucketingSink to save data into HDFS in parquet 
> format. But when the flink job was cancelled, we always got Exception in 
> BucketingSink's  close method. The datailed exception info is as below:
> [ERROR] [2017-09-26 20:51:58,893] 
> [org.apache.flink.streaming.runtime.tasks.StreamTask] - Error during disposal 
> of stream operator.
> java.io.InterruptedIOException: Interrupted while waiting for data to be 
> acknowledged by pipeline
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2151)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2130)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2266)
>   at 
> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2236)
>   at 
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>   at 
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>   at 
> org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:643)
>   at 
> org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:117)
>   at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:301)
> ...
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:429)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:334)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>   at java.lang.Thread.run(Thread.java:745)
> 
> It seems that DFSOutputStream haven't been closed before task thread is force 
> terminated. We found a similar problem in 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Changing-timeout-for-cancel-command-td12601.html
>  
> 

Re: Implement bunch of transformations applied to same source stream in Apache Flink in parallel and combine result

2017-10-12 Thread Piotr Nowojski
Hi,

What is the number of events per second that you wish to process? If it’s high 
enough (~ number of machines * number of cores) you should be just fine, 
instead of scaling with number of features, scale with number of events. If you 
have a single data source you still could randomly shuffle events before 
applying your transformations. 

Another solution might be to:
1. Assign unique eventId and split the original event using flatMap into 
tuples: 
2. keyBy featureId, eventId (or maybe do random partitioning with shuffle?)
3. perform transformation
4. keyBy eventId, ….
5. Window and reduce

But that would add more overhead compared to processing more events at the same 
time.

Piotrek

> On 11 Oct 2017, at 23:02, Andrey Salnikov  wrote:
> 
> Hi!
> 
> Could you please help me - I'm trying to use Apache Flink for machine 
> learning tasks with external ensemble/tree libs like XGBoost, so my workflow 
> will be like this:
> 
> receive single stream of data which atomic event looks like a simple vector 
> event=(X1, X2, X3...Xn) and it can be imagined as POJO fields so initially we 
> have DataStream source=...
> a lot of feature extractions code applied to the same event source: feature1 
> = source.map(X1...Xn) feature2 = source.map(X1...Xn) etc. For simplicity lets 
> DataStream feature(i) = source.map() for all features
> then I need to create a vector with extracted features (feature1, feature2, 
> ...featureK) for now it will be 40-50 features, but I'm sure it will contain 
> more items in future and easily can contains 100-500 features and more
> put these extracted features to dataset/table columns by 10 minutes window 
> and run final machine learning task on such 10 minutes data
> In simple words I need to apply several quite different map operations to the 
> same single event in stream and then combine result from all map functions in 
> single vector.
> 
> So for now I can't figure out how to implement final reduce step and run all 
> feature extraction mapjobs in parallel if possible. I spend several days on 
> flink docs site, youtube videos, googling, reading Flink's sources but it 
> seems I'm really stuck here.
> 
> The easy solution here will be to use single map operation and run each 
> feature extraction code sequentially one by one in huge map body, and then 
> return final vector (Feature1...FeatureK) for each input event. But it should 
> be crazy and non optimal.
> 
> Another solution for each two pair of features use join since all feature 
> DataStreams has same initial event and same key and only apply some 
> transformation code, but it looks ugly: write 50 joins code with some window. 
> And I think that joins and cogroups developed for joining different streams 
> from different sources and not for such map/reduce operations.
> 
> As for me for all map operations here should be a something simple which I'm 
> missing.
> 
> Could you please point me how you guys implement such tasks in Flink, and if 
> possible with example of code?
> 
> PS: I posted this question 
> 
>  to stackoverflow.
> PPS: If I will use feature1.union(feature2...featureK) I still need somehow 
> separate and combine features vector before sink, and preserve order of final 
> vectors.
> 
> Th​​anks,
> Andrey



Kafka 11 connector on Flink 1.3

2017-10-12 Thread Syed Moizuddin
Hi,

I was just wondering if I could use the Kafka 11 connector on 1.3.
If there are dependencies, then what would be the timeframe for 1.4 release

Thanks
Moiz


Re: PartitionNotFoundException when running in yarn-session.

2017-10-12 Thread Ufuk Celebi
Hey Niels,

Flink currently restarts the complete job if you have a restart
strategy configured:
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/restart_strategies.html.

I agree that only restarting the required parts of the pipeline is an
important optimization. Flink has not implemented this (fully) yet but
it's on the agenda [1] and work has already started [2].

In this particular case, everything is just slow and we don't need the
restart at all if you give the consumer a higher max timeout.

Please report back when you have more info :-)

– Ufuk

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-1+%3A+Fine+Grained+Recovery+from+Task+Failures

[2] https://issues.apache.org/jira/browse/FLINK-4256

On Thu, Oct 12, 2017 at 10:17 AM, Niels Basjes  wrote:
> Hi,
>
> I'm currently doing some tests to see it this info helps.
> I was running a different high CPU task on one of the nodes outside Yarn, so
> I took that one out of the cluster to see if that helps.
>
> What I do find strange that in this kind of error scenario the entire job
> fails.
> I would have expected something similar as with 'good old' MapReduce: The
> missing task is simply resubmitted and ran again.
> Why doesn't that happen?
>
>
> Niels
>
> On Wed, Oct 11, 2017 at 8:49 AM, Ufuk Celebi  wrote:
>>
>> Hey Niels,
>>
>> any update on this?
>>
>> – Ufuk
>>
>>
>> On Mon, Oct 9, 2017 at 10:16 PM, Ufuk Celebi  wrote:
>> > Hey Niels,
>> >
>> > thanks for the detailed report. I don't think that it is related to
>> > the Hadoop or Scala version. I think the following happens:
>> >
>> > - Occasionally, one of your tasks seems to be extremely slow in
>> > registering its produced intermediate result (the data shuffled
>> > between TaskManagers)
>> > - Another task is already requesting to consume data from this task
>> > but cannot find it (after multiple retries) and it fails the complete
>> > job (your stack trace)
>> >
>> > That happens only occasionally probably due to load in your cluster.
>> > The slow down could have multiple reasons...
>> > - Is your Hadoop cluster resource constrained and the tasks are slow to
>> > deploy?
>> > - Is your application JAR very large and needs a lot of time
>> > downloading?
>> >
>> > We have two options at this point:
>> > 1) You can increase the maximum retries via the config option:
>> > "taskmanager.network.request-backoff.max" The default is 1
>> > (milliseconds) and specifies what the maximum request back off is [1].
>> > Increasing this to 3 would give you two extra retries with pretty
>> > long delays (see [1]).
>> >
>> > 2) To be sure that this is really what is happening we could increase
>> > the log level of certain classes and check whether they have
>> > registered their results or not. If you want to do this, I'm more than
>> > happy to provide you with some classes to enable DEBUG logging for.
>> >
>> > What do you think?
>> >
>> > – Ufuk
>> >
>> > DETAILS
>> > ===
>> >
>> > - The TaskManagers produce and consume intermediate results
>> > - When a TaskManager wants to consume a result, it directly queries
>> > the producing TaskManager for it
>> > - An intermediate result becomes ready for consumption during initial
>> > task setup (state DEPLOYING)
>> > - When a TaskManager is slow to register its intermediate result and
>> > the consumer requests the result before it is ready, it can happen
>> > that a requested partition is "not found"
>> >
>> > This is what is also happening here. We retry to request the
>> > intermediate result multiple times with timed backoff [1] and only
>> > fail the request (your stack trace) if the partition is still not
>> > ready although we expect it to be ready (that is there was no failure
>> > at the producing task).
>> >
>> > [1] Starting by default at 100 millis and going up to 10_000 millis by
>> > doubling that time (100, 200, 400, 800, 1600, 3200, 6400, 1)
>> >
>> >
>> > On Mon, Oct 9, 2017 at 10:51 AM, Niels Basjes  wrote:
>> >> Hi,
>> >>
>> >> I'm having some trouble running a java based Flink job in a
>> >> yarn-session.
>> >>
>> >> The job itself consists of reading a set of files resulting in a
>> >> DataStream
>> >> (I use DataStream because in the future I intend to change the file
>> >> with a
>> >> Kafka feed), then does some parsing and eventually writes the data into
>> >> HBase.
>> >>
>> >> Most of the time running this works fine yet sometimes it fails with
>> >> this
>> >> exception:
>> >>
>> >>
>> >> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
>> >> Partition
>> >> 794b5ce385c296b7943fa4c1f072d6b9@13aa7ef02a5d9e0898204ec8ce283363
>> >> not found.
>> >>   at
>> >>
>> >> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:203)
>> >>   at
>> >>
>> >> 

Re: R/W traffic estimation between Flink and Zookeeper

2017-10-12 Thread Piotr Nowojski
Hi,

Are you asking how to measure records/s or is it possible to achieve it? To 
measure it you can check numRecordsInPerSecond metric.

As far if 1000 records/s is possible, it depends on many things like state 
backend used, state size, complexity of your application, size of the records, 
number of machines, their hardware and the network. In the very simplest cases 
it is possible to achieve millions of records per second per machine. It would 
be best to try it out in your particular use case on some small scale.

Piotrek

> On 11 Oct 2017, at 19:58, Hao Sun  wrote:
> 
> Hi Is there a way to estimate read/write traffic between flink and zk?
> I am looking for something like 1000 reads/sec or 1000 writes/sec. And the 
> size of the message.
> 
> Thanks



Re: Write each group to its own file

2017-10-12 Thread Piotr Nowojski
Hi,

There is no straightforward way to do that. First of all, the error you are 
getting is because you are trying to start new application ( 
env.fromElements(items) ) inside your reduce function.

To do what you want, you have to hash partition the products based on category 
(instead of grouping by and reducing) and after that either:

1. Sort the hash partitioned products and implement custom OutputFormat (maybe 
based on FileOutputFormat), that would start a new file when key value has 
changed.

Or

2. Implement custom OutputFormat (maybe based on FileOutputFormat), that would 
keep multiple opened files - one file per category - and write records 
accordingly.

Note that both options require first to hash partition the products. 1. Will be 
more CPU and memory consuming (have to sort the data), 2. Can exceed the 
maximum number of simultaneously opened file if number of categories is very 
high. 

Piotrek

> On 11 Oct 2017, at 17:47, rlazoti  wrote:
> 
> Hi,
> 
> Is there a way to write each group to its own file using the Dataset api
> (Batch)?
> 
> For example, lets use the following class:
> 
> case class Product(name: String, category: String)
> 
> And the following Dataset:
> 
> val products = env.fromElements(Product("i7", "cpu"), Product("R5", "cpu"),
> Product("gtx1080", "gpu"), Product("vega64", "gpu"), Product("evo250gb",
> "ssd"))
> 
> So in this example my output should be these 3 files:
> 
> - cpu.csv
> i7, cpu
> R5, cpu
> 
> - gpu.csv
> gtx1080, gpu
> vega64, gpu
> 
> - ssd.csv
> evo250gb, ssd
> 
> 
> I tried the following code, but got
> org.apache.flink.api.common.InvalidProgramException: Task not serializable.
> 
> products.groupBy("category").reduceGroup { group: Iterator[Product] =>
>  val items = group.toSeq
>  env.fromElements(items).writeAsCsv(s"${items.head.category}.csv")
>  items
> }
> 
> I welcome any of your inputs.
> 
> Thanks!
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/