[jira] [Created] (FLINK-7488) TaskManagerHeapSizeCalculationJavaBashTest sometimes fails

2017-08-21 Thread Ted Yu (JIRA)
Ted Yu created FLINK-7488:
-

 Summary: TaskManagerHeapSizeCalculationJavaBashTest sometimes fails
 Key: FLINK-7488
 URL: https://issues.apache.org/jira/browse/FLINK-7488
 Project: Flink
  Issue Type: Test
Reporter: Ted Yu
Priority: Minor


{code}
  
TaskManagerHeapSizeCalculationJavaBashTest.compareHeapSizeShellScriptWithJava:110->compareHeapSizeJavaVsScript:275
 Different heap sizes with configuration: 
{taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, 
taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, 
taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, 
taskmanager.network.memory.min=67108864} expected:<[]1000> but was:<[Setting 
HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.Using the 
result of 'hadoop classpath' to augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]1000>
  
TaskManagerHeapSizeCalculationJavaBashTest.compareNetworkBufShellScriptWithJava:81->compareNetworkBufJavaVsScript:235
 Different network buffer memory sizes with configuration: 
{taskmanager.network.memory.fraction=0.1, taskmanager.memory.off-heap=false, 
taskmanager.memory.fraction=0.7, taskmanager.memory.size=-1, 
taskmanager.network.memory.max=1073741824, taskmanager.heap.mb=1000, 
taskmanager.network.memory.min=67108864} expected:<[]104857600> but 
was:<[Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was 
set.Using the result of 'hadoop classpath' to augment the Hadoop classpath: 
/usr/hdp/2.5.0.0-1245/hadoop/conf:/usr/hdp/2.5.0.0-1245/hadoop/lib/*:/usr/hdp/2.5.0.0-1245/hadoop/.//*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/./:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-hdfs/.//*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-yarn/.//*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/lib/*:/usr/hdp/2.5.0.0-1245/hadoop-mapreduce/.//*:/usr/hdp/2.5.0.0-1245/tez/*:/usr/hdp/2.5.0.0-1245/tez/lib/*:/usr/hdp/2.5.0.0-1245/tez/conf]104857600>
{code}
$HADOOP_CONF_DIR was not set prior to running the test.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Changing parallelism on BucketingSink

2017-08-21 Thread Felix Cheung
Hi,

I'm implementing a custom sink. The job is reading a DataStream into this 
custom sink.
I'd like to be able to maximize the parallelism to use all available slots in 
the cluster, but to
write to a smaller sets of files in the final output.

When I implement this sink with DataStream.writeAsText, I get a DataStreamSink 
which has the setParallelism() method.
However, when I implement using BucketingSink, to leverage the ability to 
bucket to paths and limit file sizes, it seems there is no available option to 
change the parallelism.
It seems this isn't available either in AbstractRichFunction, RichSinkFunction, 
or SinkFunction?

It seems the only way is to change the default parallelism on the "current" 
ExecutionEnvironment, before calling addSink on the DataStream?

Any suggestion would be appreciated!



[jira] [Created] (FLINK-7487) test instability in ClassLoaderITCase (no resources available)

2017-08-21 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-7487:
--

 Summary: test instability in ClassLoaderITCase (no resources 
available)
 Key: FLINK-7487
 URL: https://issues.apache.org/jira/browse/FLINK-7487
 Project: Flink
  Issue Type: Bug
  Components: Tests
Affects Versions: 1.4.0
Reporter: Nico Kruber


This is the stack trace from https://travis-ci.org/NicoK/flink/jobs/266772103 
which contains quite some changes but the error itself should be unrelated:

{code}
testKMeansJobWithCustomClassLoader(org.apache.flink.test.classloading.ClassLoaderITCase)
  Time elapsed: 0.604 sec  <<< ERROR!
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:542)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at 
org.apache.flink.test.classloading.ClassLoaderITCase.testKMeansJobWithCustomClassLoader(ClassLoaderITCase.java:232)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:930)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:873)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at 
scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Not 
enough free slots available to run the job. You can decrease the operator 
parallelism or increase the number of slots per TaskManager in the 
configuration. Task to schedule: < Attempt #0 (Map (Map at 
main(KMeansForTest.java:67)) (4/4)) @ (unassigned) - [SCHEDULED] > with groupID 
< 704e8c44f1c3edc91e03431408eb561d > in sharing group < SlotSharingGroup 
[727c589bfbe7c65aa4ffc75585a1e7e7, f82d7994fbfdd0aecab2c7f54e58f0c1, 
62039db00aa28f9de4fa3df3b89fbc7d, 704e8c44f1c3edc91e03431408eb561d, 
208a859a78f987562b4e8dcad6e90582, 9b9f002f990306532d6f153b38835b6f, 
30f3d92eacc3068d3545693fe084a6b8, 74da3f65164120b4781de360723e60c0] >. 
Resources available to scheduler: Number of instances=2, total number of 
slots=4, available slots=0
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.scheduleTask(Scheduler.java:261)
at 
org.apache.flink.runtime.jobmanager.scheduler.Scheduler.allocateSlot(Scheduler.java:138)
at 
org.apache.flink.runtime.executiongraph.Execution.allocateSlotForExecution(Execution.java:362)
at 
org.apache.flink.runtime.executiongraph.Execution.scheduleForExecution(Execution.java:304)
at 
org.apache.flink.runtime.executiongraph.ExecutionVertex.scheduleForExecution(ExecutionVertex.java:596)
at 
org.apache.flink.runtime.executiongraph.Execution.lambda$scheduleOrUpdateConsumers$4(Execution.java:567)
at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{code}

It seems that the job started in `testDisposeSavepointWithCustomKvState` is not 
properly shut down after the test method exits and (parts of) it remain and 
block resources for following tests. Copying the relevant parts of the log here:
{code}
13:46:30,887 INFO  org.apache.flink.test.classloading.ClassLoaderITCase 
 - 

Test 

[jira] [Created] (FLINK-7486) flink-mesos: Support for adding unique attribute / group_by attribute constraints

2017-08-21 Thread Bhumika Bayani (JIRA)
Bhumika Bayani created FLINK-7486:
-

 Summary: flink-mesos: Support for adding unique attribute / 
group_by attribute constraints
 Key: FLINK-7486
 URL: https://issues.apache.org/jira/browse/FLINK-7486
 Project: Flink
  Issue Type: Improvement
  Components: Mesos
Affects Versions: 1.3.2
Reporter: Bhumika Bayani


In our setup, we have multiple mesos-workers. Inspite of this, flink 
application master most of the times ends up spawning all task-managers on same 
mesos-worker.

We intend to ensure HA of task managers. We would like to make sure each 
task-manager is running on different mesos-worker as well as such mesos-worker 
which does not share the AZ attribute with earlier task manager instances.

Netflix-fenzo supports adding UniqueHostAttribute and BalancedHostAttribute 
contraints. Flink-mesos should also enable us to add these kind of constraints.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7485) Using DataView interface to improve (MIN/MAX)WithRetractAggFunction.

2017-08-21 Thread sunjincheng (JIRA)
sunjincheng created FLINK-7485:
--

 Summary: Using DataView interface to improve 
(MIN/MAX)WithRetractAggFunction.
 Key: FLINK-7485
 URL: https://issues.apache.org/jira/browse/FLINK-7485
 Project: Flink
  Issue Type: Improvement
Reporter: sunjincheng


Currently MIN/MAX using memory structure {{HashMap}}  to store all values, 
after FLINK-7206 we can improve them by using {{DataView}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7484) com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5

2017-08-21 Thread Shashank Agarwal (JIRA)
Shashank Agarwal created FLINK-7484:
---

 Summary: com.esotericsoftware.kryo.KryoException: 
java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
 Key: FLINK-7484
 URL: https://issues.apache.org/jira/browse/FLINK-7484
 Project: Flink
  Issue Type: Bug
  Components: DataStream API, Scala API
Affects Versions: 1.3.2
 Environment: Flink 1.3.2 , Yarn Cluster, FsStateBackend
Reporter: Shashank Agarwal


I am using many CEP's and Global Window. I am getting following error sometimes 
and application  crashes. I have checked logically there's no flow in the 
program. Here ItemPojo is a Pojo class and we are using 
java.utill.List[ItemPojo]. We are using Scala DataStream API please find 
attached logs.


{code}
2017-08-17 10:04:12,814 INFO  org.apache.flink.runtime.taskmanager.Task 
- TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
 co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1) switched from 
RUNNING to FAILED.
com.esotericsoftware.kryo.KryoException: java.lang.IndexOutOfBoundsException: 
Index: 7, Size: 5
Serialization trace:
category (co.thirdwatch.pojo.ItemPojo)
underlying (scala.collection.convert.Wrappers$SeqWrapper)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at com.twitter.chill.TraversableSerializer.read(Traversable.scala:43)
at com.twitter.chill.TraversableSerializer.read(Traversable.scala:21)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
at 
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:190)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:101)
at 
org.apache.flink.api.scala.typeutils.CaseClassSerializer.copy(CaseClassSerializer.scala:32)
at 
org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
at 
org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:34)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
at 
org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
at 
org.apache.flink.runtime.state.heap.HeapListState.add(HeapListState.java:77)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:442)
at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IndexOutOfBoundsException: Index: 7, Size: 5
at java.util.ArrayList.rangeCheck(ArrayList.java:653)
at java.util.ArrayList.get(ArrayList.java:429)
at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readObjectOrNull(Kryo.java:728)
at 
com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:113)
... 22 more
2017-08-17 10:04:12,816 INFO  org.apache.flink.runtime.taskmanager.Task 
- Freeing task resources for TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
 co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 
WindowedStream.apply(WindowedStream.scala:582)) -> Flat Map -> Map -> Sink: 
Saving CSV Features Sink (1/2) (06c0d4d231bc620ba9e7924b9b0da8d1).
2017-08-17 10:04:12,816 INFO  org.apache.flink.runtime.taskmanager.Task 
- Ensuring all FileSystem streams are closed for task 
TriggerWindow(GlobalWindows(), 
ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@6d36aa3c},
 co.thirdwatch.trigger.TransactionTrigger@5707c1cb, 

[jira] [Created] (FLINK-7483) BlobCache cleanup timer not reset after job re-registration

2017-08-21 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-7483:
--

 Summary: BlobCache cleanup timer not reset after job 
re-registration
 Key: FLINK-7483
 URL: https://issues.apache.org/jira/browse/FLINK-7483
 Project: Flink
  Issue Type: Bug
  Components: Distributed Coordination, Network
Affects Versions: 1.4.0
Reporter: Nico Kruber
Assignee: Nico Kruber


Since FLINK-7057, the blob cache handles cleanup via {{registerJob}} and 
{{releaseJob}} calls where the latter sets a cleanup interval. {{registerJob}}, 
however, forgets to reset this if the job is re-registered again and so the 
job's blobs will be cleaned up although it is still used!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Re: Request for contributor permissions

2017-08-21 Thread Hai Zhou
Hi Till,

I got the permissions. Thanks so much.
I hoping to help out the community as much as possible.

Regards,
Hai Zhou

> 在 2017年8月21日,15:29,Till Rohrmann  写道:
> 
> Hi Hai Zhou,
> 
> welcome to the Flink community :-) I've given you contributor permissions.
> As a new contributor I highly recommend you to read [1].
> 
> [1] http://flink.apache.org/how-to-contribute.html
> 
> Cheers,
> Till
> 
> On Mon, Aug 21, 2017 at 3:21 AM, Hai Zhou  wrote:
> 
>> Hi everyone,
>> My name is Hai Zhou and I am new to contributing to open source. I am from
>> China (Time zone : GMT+8:00).
>> I'm very interested in Flink and willing to contribute to it.
>> Cloud anybody give me the contributor permissions?
>> My jira username is "yew1eb"
>> Thanking You,
>> Regards,
>> Hai Zhou
>> 



Re: Request for contributor permissions

2017-08-21 Thread Till Rohrmann
Hi Hai Zhou,

welcome to the Flink community :-) I've given you contributor permissions.
As a new contributor I highly recommend you to read [1].

[1] http://flink.apache.org/how-to-contribute.html

Cheers,
Till

On Mon, Aug 21, 2017 at 3:21 AM, Hai Zhou  wrote:

> Hi everyone,
> My name is Hai Zhou and I am new to contributing to open source. I am from
> China (Time zone : GMT+8:00).
> I'm very interested in Flink and willing to contribute to it.
> Cloud anybody give me the contributor permissions?
> My jira username is "yew1eb"
> Thanking You,
> Regards,
> Hai Zhou
>