Re: [DISCUSS] Dropping Scala 2.10

2017-09-19 Thread Bowen Li
+1 for dropping support for Scala 2.10

On Tue, Sep 19, 2017 at 3:29 AM, Sean Owen  wrote:

> For the curious, here's the overall task in Spark:
>
> https://issues.apache.org/jira/browse/SPARK-14220
>
> and  most of the code-related changes:
>
> https://github.com/apache/spark/pull/18645
>
> and where it's stuck at the moment:
>
> https://mail-archives.apache.org/mod_mbox/spark-dev/201709.mbox/%
> 3CCAMAsSdKe7Os80mX7jYaD2vNWLGWioBgCb4GG55eaN_iotFxZvw%40mail.gmail.com%3E
>
>
>
> On Tue, Sep 19, 2017 at 11:07 AM Márton Balassi 
> wrote:
>
>> Hi Aljoscha,
>>
>> I am in favor of the change. No concerns on my side, just one remark that
>> I have talked to Sean last week (ccd) and he mentioned that he has faced
>> some technical issues while driving the transition from 2.10 to 2.12 for
>> Spark. It had to do with changes in the scope of implicits. You might end
>> up hitting the same.
>>
>> Best,
>> Marton
>>
>> On Tue, Sep 19, 2017 at 11:56 AM, Aljoscha Krettek 
>> wrote:
>>
>>> Hi,
>>>
>>> Talking to some people I get the impression that Scala 2.10 is quite
>>> outdated by now. I would like to drop support for Scala 2.10 and my main
>>> motivation is that this would allow us to drop our custom Flakka build of
>>> Akka that we use because newer Akka versions only support Scala 2.11/2.12
>>> and we need a backported feature.
>>>
>>> Are there any concerns about this?
>>>
>>> Best,
>>> Aljoscha
>>
>>
>>


Re: heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

2017-09-19 Thread Steven Wu
> How did you kill the TaskManagers? I assume you didn't kill the JVM
process because otherwise you wouldn't see the finalizer objects piling up.

Till, I configure Chao Monkey to always kill the newest/same TaskManager.
So other N-1 TaskManagers stayed up during the whole process. Each of them
experience a job restart for each kill. Then I saw the deferred memory
cleanup by finalizer.

On Tue, Sep 19, 2017 at 9:58 AM, Steven Wu  wrote:

> Stephan, agree that it is not a real memory leak. I haven't found it
> affecting the system. so it is sth odd for now.
>
> but if it is not really necessary, why do we want to defer memory release
> with unpredictable behavior? can StreamTask stop() method take care of the
> cleanup work and don't need to rely on finalizer() or PhantomReference?
>
> On Tue, Sep 19, 2017 at 2:56 AM, Stephan Ewen  wrote:
>
>> Hi!
>>
>> From my understanding, overriding finalize() still has some use cases and
>> is valid if done correctly, (although PhantomReference has more control
>> over the cleanup process). finalize() is still used in JDK classes as well.
>>
>> Whenever one overrides finalize(), the object cannot be immediately
>> garbage collected because the finalize() method may make it reachable
>> again. It results in the following life cycle:
>>
>>   1) object becomes unreachable, is detected eligible for GC
>>   2) In the GC cycle, object is NOT collected, but finalize() is called
>>   3) If object is still not reachable, it will be collected in the
>> subsequent GC cycle
>>
>> In essence, objects that override finalize() stay for one more GC cycle.
>> That may be what you are seeing. It should not be a real memory leak, but
>> deferred memory release.
>>
>> Is this a problem that is affecting the system, or only something that
>> seems odd for now?
>>
>> If you are very concerned about this, would you be up to contribute a
>> change that uses a PhantomReference and Reference Queue for cleanup instead?
>>
>> Stephan
>>
>>
>> On Tue, Sep 19, 2017 at 12:56 AM, Till Rohrmann 
>> wrote:
>>
>>> Hi Steven,
>>>
>>> the finalize method in StreamTask acts as a safety net in case the
>>> services of the StreamTask haven't been properly shut down. In the code,
>>> however, it looks as if the TimerService, for example, is always being
>>> stopped in the finally block of the invoke method. Thus, it might not be
>>> necessary to have the finalize method as a safety net.
>>>
>>> How did you kill the TaskManagers? I assume you didn't kill the JVM
>>> process because otherwise you wouldn't see the finalizer objects piling up.
>>>
>>> I think that you can create a JIRA issue for removing the finalizer
>>> method.
>>>
>>> Cheers,
>>> Till
>>>
>>>
>>>
>>> On Thu, Sep 14, 2017 at 12:26 PM, Fabian Hueske 
>>> wrote:
>>>
 Hi Steven,

 thanks for reporting this issue.
 Looping in Till who's more familiar with the task lifecycles.

 Thanks, Fabian

 2017-09-12 7:08 GMT+02:00 Steven Wu :

> Hi ,
>
> I was using Chaos Monkey to test Flink's behavior against frequent
> killing of task manager nodes. I found that stopped/disposed StreamTask 
> got
> retained by java finalizer. It is kind like a memory leak. Since each
> StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for
> 8-CPU container, there are 2.6 * 20 * 8 MB retained in heap.
>
> [image: Inline image 1]
>
> finalize() is generally not recommended for cleanup, because "*Finalizers
> are unpredictable, often dangerous, and generally unnecessary*",
> quoted from Joshua Bloch's book.
> http://www.informit.com/articles/article.aspx?p=1216151&seqNum=7
>
> This code from StreamTask.java seems to be the cause. Is it necessary?
> can it be removed? We are using flink-1.2 release branch. But I see the
> same code in flink-1.3 and master branch
>
> /**
> * The finalize method shuts down the timer. This is a fail-safe
> shutdown, in case the original
> * shutdown method was never called.
> *
> * 
> * This should not be relied upon! It will cause shutdown to happen
> much later than if manual
> * shutdown is attempted, and cause threads to linger for longer than
> needed.
> */
> @Override
> protected void finalize() throws Throwable {
> super.finalize();
> if (timerService != null) {
> if (!timerService.isTerminated()) {
> LOG.info("Timer service is shutting down.");
> timerService.shutdownService();
> }
> }
>
> cancelables.close();
> }
>
> Thanks,
> Steven
>


>>>
>>
>


Re: got Warn message - "the expected leader session ID did not equal the received leader session ID " when using LocalFlinkMiniCluster to interpret scala code

2017-09-19 Thread XiangWei Huang
Hi Till,
   
 Thanks for your answer,it worked when i use StandaloneMiniCluster,but 
another problem is that i can’t find a way to cancel
a running Flink job without shutting down the cluster,for LocalFlinkMiniCluster 
i can do  it with below code :

   for (job <- cluster.getCurrentlyRunningJobsJava()) {
  cluster.stopJob(job)
   }

   Is it possible to cancel a running Flink job without shutting down a 
StandaloneMiniCluster ?

Best Regards,
XiangWei



> 在 2017年9月14日,16:58,Till Rohrmann  写道:
> 
> Hi XiangWei,
> 
> the problem is that the LocalFlinkMiniCluster can no longer be used in 
> combination with a RemoteExecutionEnvironment. The reason is that the 
> LocalFlinkMiniCluster uses now an internal leader election service and 
> assigns leader ids to its components. Since this is an internal service it is 
> not possible to retrieve this information like it is the case with the 
> ZooKeeper based leader election services.
> 
> Long story short, the Flink Scala shell currently does not work with a 
> LocalFlinkMiniCluster and would have to be fixed to work properly together 
> with a local execution environment. Until then, I recommend starting a local 
> standalone cluster and let the code run there.
> 
> Cheers,
> Till
> 
> 
> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang  > wrote:
> dear all,
> 
> Below is the code i execute:
> 
> import java.io ._
> import java.net .{URL, URLClassLoader}
> import java.nio.charset.Charset
> import java.util.Collections
> import java.util.concurrent.atomic.AtomicBoolean
> 
> import com.netease.atom.common.util.logging.Logging
> import com.netease.atom.interpreter.Code.Code
> import com.netease.atom.interpreter.{Code, Interpreter, InterpreterResult, 
> InterpreterUtils}
> import io.netty.buffer._
> import org.apache.flink.api.scala.FlinkILoop
> import org.apache.flink.client.CliFrontend
> import org.apache.flink.client.cli.CliFrontendParser
> import org.apache.flink.client.program.ClusterClient
> import org.apache.flink.configuration.{QueryableStateOptions, Configuration, 
> ConfigConstants, GlobalConfiguration}
> import org.apache.flink.runtime.akka.AkkaUtils
> import org.apache.flink.runtime.minicluster.{StandaloneMiniCluster, 
> LocalFlinkMiniCluster}
> 
> import scala.Console
> import scala.beans.BeanProperty
> import scala.collection.JavaConversions._
> import scala.collection.mutable
> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
> import scala.runtime.AbstractFunction0
> import scala.tools.nsc.Settings
> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
> 
> class FlinkInterpreter extends Interpreter {
>   private var bufferedReader: Option[BufferedReader] = None
>   private var jprintWriter: JPrintWriter = _
>   private val config = new Configuration;
>   private var cluster: LocalFlinkMiniCluster = _
>   @BeanProperty var imain: IMain = _
>   @BeanProperty var flinkILoop: FlinkILoop = _
>   private var out: ByteBufOutputStream = null
>   private var outBuf: ByteBuf = null
>   private var in: ByteBufInputStream = _
>   private var isRunning: AtomicBoolean = new AtomicBoolean(false)
> 
>   override def isOpen: Boolean = {
> isRunning.get()
>   }
> 
>   def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
> config.toMap.toMap.foreach(println)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
> val localCluster = new LocalFlinkMiniCluster(config, false)
> localCluster.start(true)
> val port = 
> AkkaUtils.getAddress(localCluster.jobManagerActorSystems.get.head).port
> println(s"Starting local Flink cluster (host: localhost,port: 
> ${localCluster.getLeaderRPCPort}).\n")
> ("localhost", localCluster.getLeaderRPCPort, localCluster)
>   }
> 
>  
>   /**
>* Start flink cluster and create interpreter
>*/
>   override def open: Unit = {
> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
> out = new ByteBufOutputStream(outBuf)
> in = new ByteBufInputStream(outBuf)
> //val (host, port, yarnCluster) = 
> deployNewYarnCluster(YarnConfig(Option(1), None, None, None, Option(1), None))
> val (host, port, localCluster) = startLocalMiniCluster()
> this.cluster = localCluster
> val conf = cluster.configuration
> println(s"Connecting to Flink cluster (host:$host,port:$port)...")
> flinkILoop = new FlinkILoop(host, port, conf, None)
> val settings = new Settings()
> settings.usejavacp.value = true
> settings.Yreplsync.value = true
> flinkILoop.settings_$eq(settings)
> flinkILoop.createInterpreter()

Re: need instruction on how the Flink metric works

2017-09-19 Thread Michael Fong
I just did the same test as you had with SocketWindowWordCount, and the
counter showed up all right.

You should probably connect Jconsole to localhost:28781 (or whatever port
you have your JMX server listened on)

That's how I setup the env, perhaps there is other better ways to do it.

On Wed, Sep 20, 2017 at 9:15 AM, Jiewen Shao  wrote:

> Still got stuck, here are my steps (on my laptop)
>
> for example:
> Step1:
>
> public class MetricsTest extends RichMapFunction {
>
>
> private static final long serialVersionUID = 1L;
>
> private org.apache.flink.metrics.Meter meter;
>
> private Counter counter;
>
>
> @Override
>
> public void open(Configuration config) {
>
> this.counter = getRuntimeContext()
>
> .getMetricGroup()
>
> .counter("my-counter");
>
>
>
> this.meter = getRuntimeContext()
>
> .getMetricGroup()
>
> .meter("my-meter", new DropwizardMeterWrapper(new
> com.codahale.metrics.Meter()));
>
> }
>
>
> @Override
>
> public T map(T item) throws Exception {
>
> this.counter.inc();
>
> this.meter.markEvent();
>
> return item;
>
> }
>
> }
>
>
>
>
> And I did followings in one of the Flink sample
> (SocketWindowWordCount.java):
> Step2:
>
> DataStream text = env.socketTextStream("localhost", 12345, "\n");
>
> text.map(new MetricsTest());  //<-- added this line
>
>
> Step3:
>
> mvn clean install
>
>
> step4: nc -l 12345
>
>
> step5:
>
> flink run -c [my_class_name] my.jar
>
>
> step6:  (type something under nc terminal)
>
> run jconsole, and pick the local process for this "flink run", and click
> the tab "MBeans" (I don't see my metrics other than system ones, is that
> the right place to look at?)
>
>
> and flink-conf.yaml has:
>
> # metrics
>
> metrics.reporters: jmx
>
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>
> metrics.reporter.jmx.port: 28780-28790
>
>
> and taskmanager log looks ok regarding JMX
>
>
> did I miss steps or configurations? Thanks a lot!
>
>
>
>
> On Mon, Sep 18, 2017 at 12:30 AM, Michael Fong 
> wrote:
>
>> Hi,
>>
>> There are several possibilities:
>> 1. Please check if reporter is set up ( guide
>> 
>>  )
>> For example, I would make sure my local JMXReporter service is up and
>> running by checking taskmanager.log and search for the line:
>>
>> 2017-09-18 15:18:57,174 INFO  org.apache.flink.metrics.jmx.JMXReporter
>> - Started JMX server on port 28781.
>> 2017-09-18 15:18:57,175 INFO  org.apache.flink.metrics.jmx.JMXReporter
>> - Configured JMXReporter with {port:28780-28790}
>>
>> If for any reason the JMX server does not start up, your might see some
>> errors:
>>
>> 2017-09-18 15:26:04,743 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>>  - Configuring JMXReporter with {port=28781, class=org.apac
>> he.flink.metrics.jmx.JMXReporter}.
>> 2017-09-18 15:26:04,760 ERROR org.apache.flink.runtime.metrics.MetricRegistry
>>  - Could not instantiate metrics reporter jmx. Metrics migh
>> t not be exposed/reported.
>> java.lang.RuntimeException: Could not start JMX server on any configured
>> port. Ports: 28781
>> at org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.ja
>> va:126)
>> at org.apache.flink.runtime.metrics.MetricRegistry.(Metri
>> cRegistry.java:131)
>> at org.apache.flink.runtime.taskexecutor.TaskManagerServices.fr
>> omConfiguration(TaskManagerServices.java:188)
>> at org.apache.flink.runtime.taskmanager.TaskManager$.startTaskM
>> anagerComponentsAndActor(TaskManager.scala:1984)
>> at org.apache.flink.runtime.taskmanager.TaskManager$.runTaskMan
>> ager(TaskManager.scala:1823)
>> at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>> apply$mcV$sp(TaskManager.scala:1926)
>> at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>> apply(TaskManager.scala:1904)
>> at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$1.
>> apply(TaskManager.scala:1904)
>> at scala.util.Try$.apply(Try.scala:192)
>>
>>
>> Here is my local setup for conf/flink-conf.yaml for example:
>> metrics.reporters: jmx
>> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
>> metrics.reporter.jmx.port: 28780-28790
>>
>> 2. You might want to try a real streaming example which could execute
>> continuously. If I remember correctly, when the task is completed, the
>> manager would seem to release the associated resource and object. In your
>> example, it is only processing a few strings, which would finish in matter
>> of milliseconds, before bringing up jconsole manually.
>>
>> Hope some of these help,
>>
>>
>>
>> On Mon, Sep 18, 2017 at 12:22 PM, Jiewen Shao 
>> wrote:
>>
>>> Thanks, When I started jconsole, it listed com.apache.flink.

Re: need instruction on how the Flink metric works

2017-09-19 Thread Jiewen Shao
Still got stuck, here are my steps (on my laptop)

for example:
Step1:

public class MetricsTest extends RichMapFunction {


private static final long serialVersionUID = 1L;

private org.apache.flink.metrics.Meter meter;

private Counter counter;


@Override

public void open(Configuration config) {

this.counter = getRuntimeContext()

.getMetricGroup()

.counter("my-counter");



this.meter = getRuntimeContext()

.getMetricGroup()

.meter("my-meter", new DropwizardMeterWrapper(new
 com.codahale.metrics.Meter()));

}


@Override

public T map(T item) throws Exception {

this.counter.inc();

this.meter.markEvent();

return item;

}

}




And I did followings in one of the Flink sample
(SocketWindowWordCount.java):
Step2:

DataStream text = env.socketTextStream("localhost", 12345, "\n");

text.map(new MetricsTest());  //<-- added this line


Step3:

mvn clean install


step4: nc -l 12345


step5:

flink run -c [my_class_name] my.jar


step6:  (type something under nc terminal)

run jconsole, and pick the local process for this "flink run", and click
the tab "MBeans" (I don't see my metrics other than system ones, is that
the right place to look at?)


and flink-conf.yaml has:

# metrics

metrics.reporters: jmx

metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter

metrics.reporter.jmx.port: 28780-28790


and taskmanager log looks ok regarding JMX


did I miss steps or configurations? Thanks a lot!




On Mon, Sep 18, 2017 at 12:30 AM, Michael Fong 
wrote:

> Hi,
>
> There are several possibilities:
> 1. Please check if reporter is set up ( guide
> 
>  )
> For example, I would make sure my local JMXReporter service is up and
> running by checking taskmanager.log and search for the line:
>
> 2017-09-18 15:18:57,174 INFO  org.apache.flink.metrics.jmx.JMXReporter
>   - Started JMX server on port 28781.
> 2017-09-18 15:18:57,175 INFO  org.apache.flink.metrics.jmx.JMXReporter
>   - Configured JMXReporter with {port:28780-28790}
>
> If for any reason the JMX server does not start up, your might see some
> errors:
>
> 2017-09-18 15:26:04,743 INFO  org.apache.flink.runtime.metrics.MetricRegistry
>  - Configuring JMXReporter with {port=28781, class=org.apac
> he.flink.metrics.jmx.JMXReporter}.
> 2017-09-18 15:26:04,760 ERROR org.apache.flink.runtime.metrics.MetricRegistry
>  - Could not instantiate metrics reporter jmx. Metrics migh
> t not be exposed/reported.
> java.lang.RuntimeException: Could not start JMX server on any configured
> port. Ports: 28781
> at org.apache.flink.metrics.jmx.JMXReporter.open(JMXReporter.
> java:126)
> at org.apache.flink.runtime.metrics.MetricRegistry.(
> MetricRegistry.java:131)
> at org.apache.flink.runtime.taskexecutor.TaskManagerServices.
> fromConfiguration(TaskManagerServices.java:188)
> at org.apache.flink.runtime.taskmanager.TaskManager$.
> startTaskManagerComponentsAndActor(TaskManager.scala:1984)
> at org.apache.flink.runtime.taskmanager.TaskManager$.
> runTaskManager(TaskManager.scala:1823)
> at org.apache.flink.runtime.taskmanager.TaskManager$$
> anonfun$1.apply$mcV$sp(TaskManager.scala:1926)
> at org.apache.flink.runtime.taskmanager.TaskManager$$
> anonfun$1.apply(TaskManager.scala:1904)
> at org.apache.flink.runtime.taskmanager.TaskManager$$
> anonfun$1.apply(TaskManager.scala:1904)
> at scala.util.Try$.apply(Try.scala:192)
>
>
> Here is my local setup for conf/flink-conf.yaml for example:
> metrics.reporters: jmx
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporter.jmx.port: 28780-28790
>
> 2. You might want to try a real streaming example which could execute
> continuously. If I remember correctly, when the task is completed, the
> manager would seem to release the associated resource and object. In your
> example, it is only processing a few strings, which would finish in matter
> of milliseconds, before bringing up jconsole manually.
>
> Hope some of these help,
>
>
>
> On Mon, Sep 18, 2017 at 12:22 PM, Jiewen Shao 
> wrote:
>
>> Thanks, When I started jconsole, it listed 
>> com.apache.flink.runtime.jobmanager..:[port]
>> as one of the Local Process, i was able to connect to it with insecure
>> connection, but i was not able to locate the Counter metrics, I only saw
>> some system metrics.
>>
>> On Sun, Sep 17, 2017 at 7:39 PM, Michael Fong 
>> wrote:
>>
>>> Hi,
>>>
>>> You may enable metrics reporter to see the output of your metrics;
>>> counter in your example.
>>>
>>> There is a brief documentation regarding to metrics and reporter setup
>>> at link
>>> 

Re: the design of spilling to disk

2017-09-19 Thread Kurt Young
Copied from my earlier response to some similar question:

"Here is a short description for how it works: there are totally 3 threads
working together, one for reading, one for sorting partial data in memory,
and the last one is responsible for spilling. Flink will first figure out
how many memory it can use during the in-memory sort, and manage them as
MemorySegments. Once these memory runs out, the sorting thread will take
over these memory and do the in-memory sorting (For more details about
in-memory sorting, you can see NormalizedKeySorter). After this, the
spilling thread will write this sorted data to disk and make these memory
available again for reading. This will repeated until all data has been
processed.
Normally, the data will be read twice (one from source, and one from disk)
and write once, but if you spilled too much files, flink will first merge
some all the files and make sure the last merge step will not exceed some
limit (default 128). Hope this can help you."

Best,
Kurt

On Wed, Sep 20, 2017 at 12:19 AM, Florin Dinu  wrote:

> Hi Kostas,
>
>
> Thank you for the quick reply and the tips. I will check them out !
>
>
> I would like to start by understanding the way secondary storage is used
> in batch processing.
>
> If you guys have additional pointers on that, it would certainly help me a
> lot.
>
>
> Thanks again,
>
> Florin
>
>
> --
> *From:* Kostas Kloudas 
> *Sent:* Tuesday, September 19, 2017 18:10
> *To:* Florin Dinu
> *Cc:* user@flink.apache.org; fhue...@apache.org
> *Subject:* Re: the design of spilling to disk
>
> Hi Florin,
>
> Unfortunately, there is no design document.
>
> The UnilateralSortMerger.java is used in the batch processing mode (not
> is streaming) and,
> in fact, the code dates some years back. I cc also Fabian as he may have
> more things to say on this.
>
> Now for the streaming side, Flink uses 3 state-backends, in-memory (no
> spilling and mainly useful for testing),
> filesystem and RocksDB (both eventually spill to disk but in different
> ways), and it also supports incremental
> checkpoints, i.e. at each checkpoint it only stores the diff between
> checkpoint[i] and checkpoint[i-1].
>
> For more information on Flink state and state backends, checkout the
> latest talk from Stefan Richter at
> Flink Forward Berlin 2017 (https://www.youtube.com/watch?v=dWQ24wERItM)
> and the .
>
> Cheers,
> Kostas
>
> On Sep 19, 2017, at 6:00 PM, Florin Dinu  wrote:
>
> Hello everyone,
>
> In our group at EPFL we're doing research on understanding and potentially
> improving the performance of data-parallel frameworks that use secondary
> storage.
> I was looking at the Flink code to understand how spilling to disk
> actually works.
> So far I got to the UnilateralSortMerger.java and its spill and reading
> threads. I also saw there are some spilling markers used.
> I am curious if there is any design document available on this topic.
> I was not able to find much online.
> If there is no such design document I would appreciate if someone could
> help me understand how these spilling markers are used.
> At a higher level, I am trying to understand how much data does Flink
> spill to disk after it has concluded that it needs to spill to disk.
>
> Thank you very much
> Florin Dinu
>
>
>


Re: Task Manager was lost/killed due to full GC

2017-09-19 Thread ShB
Thanks for your response!

Recommendation to decrease allotted memory? Which allotted memory should be
decreased?

I tried decreasing taskmanager.memory.fraction to give more memory to user
managed operations, that doesn't work beyond a point. Also tried increasing
containerized.heap-cutoff-ratio, that didn't work either.

What eventually solved the problem was increasing parallelism - throwing in
many more task managers. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-19 Thread Emily McMahon
Thanks Eron & Fabian.

The issue was hitting a yarn proxy url vs the node itself. For example this
worked
http://
{ip}:37716/jobs/1a0fd176ec8aabb9b8464fa481f755f0/cancel-with-savepoint/target-directory/s3%3A%2F%2F%2Fremit-flink

But this did not
http://
{ip}:20888/proxy/application_1504649135200_0001/jobs/1a0fd176ec8aabb9b8464fa481f755f0/cancel-with-savepoint/target-directory/s3%3A%2F%2F%2Fremit-flink

It's a bit confusing because the cancel api works with either and the proxy
url sometimes works as this was successful http://
{ip}:20888/proxy/application_1504649135200_0001/jobs/cca2dd609c716a7b0a195700777e5b1f/cancel-with-savepoint/target-directory/tmp/

Cheers,
Emily


On Tue, Sep 19, 2017 at 2:37 PM, Eron Wright  wrote:

> Good news, it can be done if you carefully encode the target directory
> with percent-encoding, as per:
> https://tools.ietf.org/html/rfc3986#section-2.1
>
> For example, given the directory `s3:///savepoint-bucket/my-awesome-job`,
> which encodes to `s3%3A%2F%2F%2Fsavepoint-bucket%2Fmy-awesome-job`, I was
> able to submit the following URL:
> http://localhost:8081/jobs/5c360ded6e4b7d8db103e71d68b7c8
> 3d/cancel-with-savepoint/target-directory/s3%3A%2F%2F%
> 2Fsavepoint-bucket%2Fmy-awesome-job
>
> And see the following in the log:
> 2017-09-19 14:27:45,939 INFO  org.apache.flink.runtime.jobmanager.JobManager
>- Trying to cancel job 5c360ded6e4b7d8db103e71d68b7c83d
> with savepoint to s3:///savepoint-bucket/my-awesome-job
>
> -Eron
>
> On Tue, Sep 19, 2017 at 1:54 PM, Fabian Hueske  wrote:
>
>> Hi Emily,
>>
>> thanks for reaching out.
>> I'm not familiar with the details of the Rest API but Ufuk (in CC) might
>> be able to help you.
>>
>> Best, Fabian
>>
>> 2017-09-19 10:23 GMT+02:00 Emily McMahon :
>>
>>> I've tried every combination I can think of to pass an s3 path as the
>>> target directory (url encode, include trailing slash, etc)
>>>
>>> I can successfully pass a local path as the target directory (ie
>>> /jobs/$jobID/cancel-with-savepoint/target-directory/tmp) so I don't
>>> think there's a problem with the jobId or rest of the url. I also verified
>>> I can create the savepoint on s3 from the command line so it's not a
>>> permission issue.
>>>
>>> Here's the same question on stack overflow
>>> 
>>>  (with
>>> the exception that they are getting a 502 whereas I'm getting a 404)
>>>
>>> using Flink 1.3.1
>>>
>>> Anyone have a working example?
>>>
>>> Thanks,
>>> Emily
>>>
>>
>>
>


Re: Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-19 Thread Eron Wright
Good news, it can be done if you carefully encode the target directory with
percent-encoding, as per:
https://tools.ietf.org/html/rfc3986#section-2.1

For example, given the directory `s3:///savepoint-bucket/my-awesome-job`,
which encodes to `s3%3A%2F%2F%2Fsavepoint-bucket%2Fmy-awesome-job`, I was
able to submit the following URL:
http://localhost:8081/jobs/5c360ded6e4b7d8db103e71d68b7c83d/cancel-with-savepoint/target-directory/s3%3A%2F%2F%2Fsavepoint-bucket%2Fmy-awesome-job

And see the following in the log:
2017-09-19 14:27:45,939 INFO
 org.apache.flink.runtime.jobmanager.JobManager- Trying to
cancel job 5c360ded6e4b7d8db103e71d68b7c83d with savepoint to
s3:///savepoint-bucket/my-awesome-job

-Eron

On Tue, Sep 19, 2017 at 1:54 PM, Fabian Hueske  wrote:

> Hi Emily,
>
> thanks for reaching out.
> I'm not familiar with the details of the Rest API but Ufuk (in CC) might
> be able to help you.
>
> Best, Fabian
>
> 2017-09-19 10:23 GMT+02:00 Emily McMahon :
>
>> I've tried every combination I can think of to pass an s3 path as the
>> target directory (url encode, include trailing slash, etc)
>>
>> I can successfully pass a local path as the target directory (ie
>> /jobs/$jobID/cancel-with-savepoint/target-directory/tmp) so I don't
>> think there's a problem with the jobId or rest of the url. I also verified
>> I can create the savepoint on s3 from the command line so it's not a
>> permission issue.
>>
>> Here's the same question on stack overflow
>> 
>>  (with
>> the exception that they are getting a 502 whereas I'm getting a 404)
>>
>> using Flink 1.3.1
>>
>> Anyone have a working example?
>>
>> Thanks,
>> Emily
>>
>
>


Re: Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-19 Thread Fabian Hueske
Hi Emily,

thanks for reaching out.
I'm not familiar with the details of the Rest API but Ufuk (in CC) might be
able to help you.

Best, Fabian

2017-09-19 10:23 GMT+02:00 Emily McMahon :

> I've tried every combination I can think of to pass an s3 path as the
> target directory (url encode, include trailing slash, etc)
>
> I can successfully pass a local path as the target directory (ie
> /jobs/$jobID/cancel-with-savepoint/target-directory/tmp) so I don't think
> there's a problem with the jobId or rest of the url. I also verified I can
> create the savepoint on s3 from the command line so it's not a permission
> issue.
>
> Here's the same question on stack overflow
> 
>  (with
> the exception that they are getting a 502 whereas I'm getting a 404)
>
> using Flink 1.3.1
>
> Anyone have a working example?
>
> Thanks,
> Emily
>


Re: Problem in Flink 1.3.2 with Mesos task managers offers

2017-09-19 Thread Eron Wright
Hello, the current behavior is that Flink holds onto received offers for up
to two minutes while it attempts to provision the TMs.   Flink can combine
small offers to form a single TM, to combat fragmentation that develops
over time in a Mesos cluster.   Are you saying that unused offers aren't
being released after two minutes?

There's a log entry you should see in the JM log whenever an offer is
released:
LOG.info(s"Declined offer ${lease.getId} from ${lease.hostname()} "
  + s"of ${lease.memoryMB()} MB, ${lease.cpuCores()} cpus.")

The timeout value isn't configurable at the moment, but if you're willing
to experiment by building Flink from source, you may adjust the two minute
timeout to something lower as follows.   In the `MesosFlinkResourceManager`
class, edit the `createOptimizer` method to call `withLeaseOfferExpirySecs`
on the `TaskScheduler.Builder` object.

Let us know if that helps and we'll make the timeout configurable.
-Eron

On Tue, Sep 19, 2017 at 8:58 AM, Francisco Gonzalez Barea <
francisco.gonza...@piksel.com> wrote:

> Hello guys,
>
> We have a flink 1.3.2 session deployed from Marathon json to Mesos with
> some of the following parameters as environment variables:
>
>
> *"flink_mesos.initial-tasks": "8",*
> *"flink_mesos.resourcemanager.tasks.mem": "4096",*
>
>
> And other environment variables including zookeeper, etc.
>
> The mesos cluster is used for diferents applications (kafka, ad-hoc...),
> and have fragmentation into the agents. Our problem is that the flink
> session is getting all offers, even small ones. In case there are not
> enough offers to suit that configuration, it gets all of them, so there are
> no resources and offers free for other applications.
>
> So the question would be what is the right configuration in these cases to
> avoid using all resources for the same flink session.
>
> Thanks in advance.
> Regards
>
> This message is private and confidential. If you have received this
> message in error, please notify the sender or serviced...@piksel.com and
> remove it from your system.
>
> Piksel Inc is a company registered in the United States, 2100 Powers
> Ferry Road SE, Suite 400, Atlanta, GA 30339
> 
>


Re: Classpath/ClassLoader issues

2017-09-19 Thread Garrett Barton
Fabian,

 It looks like hive instantiates both input and output formats when doing
either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler
where it tries to load both.  It looks like its happening after the writes
complete and flink is in the finish/finalize stage.  When I watch the
counters in the Flink ui, i see all output tasks mark finished along with
bytes sent and records sent being exactly what I expect them to be.  The
first error also mentions the master, is this the flink jobmanager process
then?

The expanded stacktrace is:

Caused by: java.lang.Exception: Failed to finalize execution on master
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1325)
at
org.apache.flink.runtime.executiongraph.ExecutionVertex.executionFinished(ExecutionVertex.java:688)
at
org.apache.flink.runtime.executiongraph.Execution.markFinished(Execution.java:797)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.updateState(ExecutionGraph.java:1477)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
... 8 more
Caused by: java.lang.RuntimeException: java.io.IOException: Failed to load
foster storage handler
at
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
at
org.apache.flink.runtime.jobgraph.OutputFormatVertex.finalizeOnMaster(OutputFormatVertex.java:118)
at
org.apache.flink.runtime.executiongraph.ExecutionGraph.vertexFinished(ExecutionGraph.java:1320)
... 14 more
Caused by: java.io.IOException: Failed to load foster storage handler
at
org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.java:409)
at
org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.java:367)
at
org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getOutputFormat(HCatBaseOutputFormat.java:77)
at
org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutputCommitter(HCatOutputFormat.java:275)
at
org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
... 16 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io.o
rc.OrcInputFormat
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<
init>(FosterStorageHandler.68)
at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
HCatUtil.java:404)


Thank you all for any help. :)

On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske  wrote:

> Hi Garrett,
>
> Flink distinguishes between two classloaders: 1) the system classloader
> which is the main classloader of the process. This classloader loads all
> jars in the ./lib folder and 2) the user classloader which loads the job
> jar.
> AFAIK, the different operators do not have distinct classloaders. So, in
> principle all operators should use the same user classloader.
>
> According to the stacktrace you posted, the OrcInputFormat cannot be found
> when you try to emit to an ORC file.
> This looks suspicious because I would rather expect the OrcOutputFormat to
> be the problem than the input format.
> Can you post more of the stacktrace? This would help to identify the spot
> in the Flink code where the exception is thrown.
>
> Thanks, Fabian
>
> 2017-09-18 18:42 GMT+02:00 Garrett Barton :
>
>> Hey all,
>>
>>  I am trying out a POC with flink on yarn.  My simple goal is to read
>> from a Hive ORC table, process some data and write to a new Hive ORC table.
>>
>> Currently I can get Flink to read the source table fine, both with using
>> The HCatalog Input format directly, and by using the flink-hcatalog
>> wrapper.  Processing the data also works fine. Dumping to console or a text
>> file also works fine.
>>
>> I'm now stuck trying to write the data out, I'm getting
>> ClassNotFoundExceptions:
>>
>> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io
>> .orc.OrcInputFormat
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java

Re: Flink kafka consumer that read from two partitions in local mode

2017-09-19 Thread Fabian Hueske
Hi Tovi,

your code looks OK to me. Maybe Gordon (in CC) has an idea what is going
wrong.
Just a side note: you don't need to set the parallelism to 2 to read from
two partitions. A single consumer instance reads can read from multiple
partitions.

Best,
Fabian

2017-09-19 17:02 GMT+02:00 Sofer, Tovi :

> Hi,
>
>
>
> I am trying to setup FlinkKafkaConsumer which reads from two partitions in
> local mode, using  setParallelism=2.
>
> The producer writes to two partition (as it is shown in metrics report).
>
> But the consumer seems to read always from one partition only.
>
> Am I missing something in partition configuration?
>
>
>
> Code:
>
>
>
> *Producer setup:*
> Configuration localConfig = *new *Configuration();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.*createLocalEnvironment*(parallelism, localConfig);
>
> env.setParallelism(2);
>
> String kafkaPort = 
> parameters.get(SimulatorConfig.ParamsEnum.*KAFKA_PORT*.fullName());
>
> SingleOutputStreamOperator fixMsgSource = 
> env.addSource(srcMsgProvider.getFixMsgSource(), 
> TypeInformation.*of*(String.*class*)).name(*sourceGenerationType*.getValue());
> fixMsgSource.addSink(*new *FlinkKafkaProducer010<>(*"localhost:"  *+ 
> kafkaPort, *TOPIC_NAME*, *new *SimpleStringSchema()))
>
> .name(*“fix_topic”*);
>
> env.execute(“*MsgSimulatorJob*”);
>
>
>
>
>
> *Consumer setup:*
>
>
> String topicName = “fix”;
> Configuration conf = *new *Configuration();
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.*createLocalEnvironmentWithWebUI*(conf);
>
> env.setParallelism(2);
> env.getConfig().setGlobalJobParameters(configParams);
> *// make parameters available in the web 
> interface*DeserializationSchema> deserializationSchema = 
> *new *SimpleStringAndTimestampDeserializationSchema ();
> FlinkKafkaConsumer010> kafkaConsumer = *new 
> *FlinkKafkaConsumer010<>(topicName, deserializationSchema, 
> kafkaParams.getProperties());
>
> DataStream> fixMessagesStream = 
> env.addSource(kafkaConsumer).name(*"fix_topic"*).setParallelism(2);
>
>
>
> As you can see in output, only 1 consumer partition seems to be used:
>
> Producer output:
>
> 2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source:
> random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0
>
> 2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink:
> fix_topic.1.numRecordsInPerSecond: 19836.0333
>
> 2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink:
> fix_topic.0.numRecordsInPerSecond: 20337.9334
>
> 2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source:
> random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0
>
> Consumer output:
>
> 2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate:
> 982.0051413881748
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.2666
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming
> Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0
>
> 2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1]
> 127

Re: Delay in Flink timers

2017-09-19 Thread Narendra Joshi
The number of timers is about 400 per second. We have observed that onTimer
calls are delayed only when the number of scheduled timers starts
increasing from a minima. It would be great if you can share pointers to
code I can look at to understand it better. :)

Narendra Joshi
On 14 Sep 2017 16:04, "Aljoscha Krettek"  wrote:

> Hi,
>
> Yes, execution of these methods is protected by a synchronized block. This
> is not a fair lock so incoming data might starve timer callbacks. What is
> the number of timers we are talking about here?
>
> Best,
> Aljoscha
>
> > On 11. Sep 2017, at 19:38, Chesnay Schepler  wrote:
> >
> > It is true that onTimer and processElement are never called at the same
> time.
> >
> > I'm not entirely sure whether there is any prioritization/fairness
> between these methods
> > (if not if could be that onTimer is starved) , looping in Aljoscha who
> hopefully knows more
> > about this.
> >
> > On 10.09.2017 09:31, Narendra Joshi wrote:
> >> Hi,
> >>
> >> We are using Flink as a timer scheduler and delay in timer execution is
> >> a huge problem for us. What we have experienced is that as the number of
> >> Timers we register increases the timers start getting delayed (for more
> >> than 5 seconds). Can anyone point us in the right direction to figure
> >> out what might be happening?
> >>
> >> I have been told that `onTimer` and `processElement` are called with a
> >> mutually exclusive lock. Could this locking be the reason this is
> >> happening? In both the functions there is no IO happening and it should
> >> not take 5 seconds.
> >>
> >> Is it possible that calls to `processElement` starve `onTimer` calls?
> >>
> >>
> >> --
> >> Narendra Joshi
> >>
> >
>
>


Re: heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

2017-09-19 Thread Steven Wu
Stephan, agree that it is not a real memory leak. I haven't found it
affecting the system. so it is sth odd for now.

but if it is not really necessary, why do we want to defer memory release
with unpredictable behavior? can StreamTask stop() method take care of the
cleanup work and don't need to rely on finalizer() or PhantomReference?

On Tue, Sep 19, 2017 at 2:56 AM, Stephan Ewen  wrote:

> Hi!
>
> From my understanding, overriding finalize() still has some use cases and
> is valid if done correctly, (although PhantomReference has more control
> over the cleanup process). finalize() is still used in JDK classes as well.
>
> Whenever one overrides finalize(), the object cannot be immediately
> garbage collected because the finalize() method may make it reachable
> again. It results in the following life cycle:
>
>   1) object becomes unreachable, is detected eligible for GC
>   2) In the GC cycle, object is NOT collected, but finalize() is called
>   3) If object is still not reachable, it will be collected in the
> subsequent GC cycle
>
> In essence, objects that override finalize() stay for one more GC cycle.
> That may be what you are seeing. It should not be a real memory leak, but
> deferred memory release.
>
> Is this a problem that is affecting the system, or only something that
> seems odd for now?
>
> If you are very concerned about this, would you be up to contribute a
> change that uses a PhantomReference and Reference Queue for cleanup instead?
>
> Stephan
>
>
> On Tue, Sep 19, 2017 at 12:56 AM, Till Rohrmann 
> wrote:
>
>> Hi Steven,
>>
>> the finalize method in StreamTask acts as a safety net in case the
>> services of the StreamTask haven't been properly shut down. In the code,
>> however, it looks as if the TimerService, for example, is always being
>> stopped in the finally block of the invoke method. Thus, it might not be
>> necessary to have the finalize method as a safety net.
>>
>> How did you kill the TaskManagers? I assume you didn't kill the JVM
>> process because otherwise you wouldn't see the finalizer objects piling up.
>>
>> I think that you can create a JIRA issue for removing the finalizer
>> method.
>>
>> Cheers,
>> Till
>>
>>
>>
>> On Thu, Sep 14, 2017 at 12:26 PM, Fabian Hueske 
>> wrote:
>>
>>> Hi Steven,
>>>
>>> thanks for reporting this issue.
>>> Looping in Till who's more familiar with the task lifecycles.
>>>
>>> Thanks, Fabian
>>>
>>> 2017-09-12 7:08 GMT+02:00 Steven Wu :
>>>
 Hi ,

 I was using Chaos Monkey to test Flink's behavior against frequent
 killing of task manager nodes. I found that stopped/disposed StreamTask got
 retained by java finalizer. It is kind like a memory leak. Since each
 StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for
 8-CPU container, there are 2.6 * 20 * 8 MB retained in heap.

 [image: Inline image 1]

 finalize() is generally not recommended for cleanup, because "*Finalizers
 are unpredictable, often dangerous, and generally unnecessary*",
 quoted from Joshua Bloch's book.
 http://www.informit.com/articles/article.aspx?p=1216151&seqNum=7

 This code from StreamTask.java seems to be the cause. Is it necessary?
 can it be removed? We are using flink-1.2 release branch. But I see the
 same code in flink-1.3 and master branch

 /**
 * The finalize method shuts down the timer. This is a fail-safe
 shutdown, in case the original
 * shutdown method was never called.
 *
 * 
 * This should not be relied upon! It will cause shutdown to happen much
 later than if manual
 * shutdown is attempted, and cause threads to linger for longer than
 needed.
 */
 @Override
 protected void finalize() throws Throwable {
 super.finalize();
 if (timerService != null) {
 if (!timerService.isTerminated()) {
 LOG.info("Timer service is shutting down.");
 timerService.shutdownService();
 }
 }

 cancelables.close();
 }

 Thanks,
 Steven

>>>
>>>
>>
>


Re: the design of spilling to disk

2017-09-19 Thread Florin Dinu
Hi Kostas,


Thank you for the quick reply and the tips. I will check them out !


I would like to start by understanding the way secondary storage is used in 
batch processing.

If you guys have additional pointers on that, it would certainly help me a lot.


Thanks again,

Florin



From: Kostas Kloudas 
Sent: Tuesday, September 19, 2017 18:10
To: Florin Dinu
Cc: user@flink.apache.org; fhue...@apache.org
Subject: Re: the design of spilling to disk

Hi Florin,

Unfortunately, there is no design document.

The UnilateralSortMerger.java is used in the batch processing mode (not is 
streaming) and,
in fact, the code dates some years back. I cc also Fabian as he may have more 
things to say on this.

Now for the streaming side, Flink uses 3 state-backends, in-memory (no spilling 
and mainly useful for testing),
filesystem and RocksDB (both eventually spill to disk but in different ways), 
and it also supports incremental
checkpoints, i.e. at each checkpoint it only stores the diff between 
checkpoint[i] and checkpoint[i-1].

For more information on Flink state and state backends, checkout the latest 
talk from Stefan Richter at
Flink Forward Berlin 2017 (https://www.youtube.com/watch?v=dWQ24wERItM) and the 
.

Cheers,
Kostas

On Sep 19, 2017, at 6:00 PM, Florin Dinu 
mailto:florin.d...@epfl.ch>> wrote:

Hello everyone,

In our group at EPFL we're doing research on understanding and potentially 
improving the performance of data-parallel frameworks that use secondary 
storage.
I was looking at the Flink code to understand how spilling to disk actually 
works.
So far I got to the UnilateralSortMerger.java and its spill and reading 
threads. I also saw there are some spilling markers used.
I am curious if there is any design document available on this topic.
I was not able to find much online.
If there is no such design document I would appreciate if someone could help me 
understand how these spilling markers are used.
At a higher level, I am trying to understand how much data does Flink spill to 
disk after it has concluded that it needs to spill to disk.

Thank you very much
Florin Dinu



Re: the design of spilling to disk

2017-09-19 Thread Kostas Kloudas
Hi Florin,

Unfortunately, there is no design document.

The UnilateralSortMerger.java is used in the batch processing mode (not is 
streaming) and, 
in fact, the code dates some years back. I cc also Fabian as he may have more 
things to say on this.

Now for the streaming side, Flink uses 3 state-backends, in-memory (no spilling 
and mainly useful for testing),
filesystem and RocksDB (both eventually spill to disk but in different ways), 
and it also supports incremental 
checkpoints, i.e. at each checkpoint it only stores the diff between 
checkpoint[i] and checkpoint[i-1].

For more information on Flink state and state backends, checkout the latest 
talk from Stefan Richter at 
Flink Forward Berlin 2017 (https://www.youtube.com/watch?v=dWQ24wERItM 
) and the .

Cheers,
Kostas

> On Sep 19, 2017, at 6:00 PM, Florin Dinu  wrote:
> 
> Hello everyone,
> 
> In our group at EPFL we're doing research on understanding and potentially 
> improving the performance of data-parallel frameworks that use secondary 
> storage.
> I was looking at the Flink code to understand how spilling to disk actually 
> works.
> So far I got to the UnilateralSortMerger.java and its spill and reading 
> threads. I also saw there are some spilling markers used.
> I am curious if there is any design document available on this topic.
> I was not able to find much online.
> If there is no such design document I would appreciate if someone could help 
> me understand how these spilling markers are used.
> At a higher level, I am trying to understand how much data does Flink spill 
> to disk after it has concluded that it needs to spill to disk.
> 
> Thank you very much
> Florin Dinu



the design of spilling to disk

2017-09-19 Thread Florin Dinu
Hello everyone,


In our group at EPFL we're doing research on understanding and potentially 
improving the performance of data-parallel frameworks that use secondary 
storage.

I was looking at the Flink code to understand how spilling to disk actually 
works.

So far I got to the UnilateralSortMerger.java and its spill and reading 
threads. I also saw there are some spilling markers used.

I am curious if there is any design document available on this topic.

I was not able to find much online.

If there is no such design document I would appreciate if someone could help me 
understand how these spilling markers are used.

At a higher level, I am trying to understand how much data does Flink spill to 
disk after it has concluded that it needs to spill to disk.


Thank you very much

Florin Dinu


Problem in Flink 1.3.2 with Mesos task managers offers

2017-09-19 Thread Francisco Gonzalez Barea
Hello guys,

We have a flink 1.3.2 session deployed from Marathon json to Mesos with some of 
the following parameters as environment variables:


"flink_mesos.initial-tasks": "8",
"flink_mesos.resourcemanager.tasks.mem": "4096",

And other environment variables including zookeeper, etc.

The mesos cluster is used for diferents applications (kafka, ad-hoc...), and 
have fragmentation into the agents. Our problem is that the flink session is 
getting all offers, even small ones. In case there are not enough offers to 
suit that configuration, it gets all of them, so there are no resources and 
offers free for other applications.

So the question would be what is the right configuration in these cases to 
avoid using all resources for the same flink session.

Thanks in advance.
Regards

This message is private and confidential. If you have received this message in 
error, please notify the sender or serviced...@piksel.com and remove it from 
your system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road 
SE, Suite 400, Atlanta, GA 30339


Re: Load distribution through the cluster

2017-09-19 Thread AndreaKinn
If I apply a sharing slot as in the example:

DataStream LTzAccStream = env
.addSource(new FlinkKafkaConsumer010<>("topic", 
new
CustomDeserializer(), properties))
.assignTimestampsAndWatermarks(new 
CustomTimestampExtractor())
.map(new MapFunction, 
Event>(){ 
  @Override 
public Event map(Tuple2 
value) throws Exception { 
return new Event(value.f0, 
value.f1); 
} 
}).slotSharingGroup("group1");

just the map operator is assigned to the shared slot or it happens for the
entire block (addSource + assignTimestamp + map)?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Noisy org.apache.flink.configuration.GlobalConfiguration

2017-09-19 Thread Elias Levy
Till,

Using 1.3.2 and like Ufuk mentioned, using S3 for checkpointing.

On Tue, Sep 19, 2017 at 4:28 AM, Till Rohrmann  wrote:

> Hi Elias,
>
> which version of Flink and which state backend are you running? I tried to
> reproduce it and wasn't successful so far.
>
> We recently changed a bit how we load the GlobalConfiguration in
> combination with dynamic properties [1]. Maybe this has affected what
> you've reported as well.
>
> [1] https://issues.apache.org/jira/browse/FLINK-7269
>
> Cheers,
> Till
>
> On Tue, Sep 19, 2017 at 2:44 AM, Elias Levy 
> wrote:
>
>> Is there a particular reason that GlobalConfiguration is so noisy?
>>
>> The task manager log is full of "Loading configuration property" messages
>> from GlobalConfiguration each time there is a checkpoint.  Why isn't the
>> configuration read once?
>>
>
>


Re: How to use operator list state like a HashMap?

2017-09-19 Thread Tony Wei
Hi Fabian,

This is a good advice, but I had already tried adding random value to my
data and it seems not very useful.

The key set of my data is small, around 10 ~ 20. If the range of random
number is small, the distribution might not be better, even worse. I think
the reason is that KeyedStream uses murmur hash to partition key and it
wouldn't guarantee the distribution is fair.
Of course if the range of random number is large enough, the probability of
even distribution is higher. It means I need to cache more data in state
because the data with the original key would be separated to a larger key
set. I would prefer to avoid this situation.

Best Regards,
Tony Wei

2017-09-19 22:56 GMT+08:00 Fabian Hueske :

> Hi Tony,
>
> operator state can only be kept on the heap.
>
> One thing you could try is to add a random value to you data and keyBy on
> a composite key that consists of your original key and the random value.
> It is important though, that you actually add the random value to your
> data to ensure that the extracted key is always the same, i.e.,
> deterministic with respect to the data.
> This should evenly distribute your data and allow you to use keyed
> MapState.
>
> Hope this helps,
> Fabian
>
> 2017-09-19 15:58 GMT+02:00 Tony Wei :
>
>> Hi,
>>
>> I have a basic streaming job that continuously persist data from Kafka to
>> S3.
>> Those data would be grouped by some dimensions and a limited amount.
>>
>> Originally, I used 'keyBy' and key state to fulfill the requirement.
>> However, because the data is extremely skewed, I turned to use map
>> function to aggregate data for some partitions only, so that I can balance
>> the amount of data in each sub tasks.
>>
>> I used a HashMap to store data by different dimensions inner map function
>> and convert it to operator list state when 'snapshot()' is called.
>> But, that makes another problem. Because I can't access operator list
>> state directly like using key state in KeyedStream, I have to use heap
>> space to store those state. It leads to the limitation of the amount that I
>> can cache in map function.
>>
>> I was wondering if there is any good suggestion to deal with this problem
>> or how to use operator list state like this scenario with a better manner.
>> Thank you.
>>
>>
>> Best Regards,
>> Tony Wei
>>
>
>


Re: Classpath/ClassLoader issues

2017-09-19 Thread Fabian Hueske
Hi Garrett,

Flink distinguishes between two classloaders: 1) the system classloader
which is the main classloader of the process. This classloader loads all
jars in the ./lib folder and 2) the user classloader which loads the job
jar.
AFAIK, the different operators do not have distinct classloaders. So, in
principle all operators should use the same user classloader.

According to the stacktrace you posted, the OrcInputFormat cannot be found
when you try to emit to an ORC file.
This looks suspicious because I would rather expect the OrcOutputFormat to
be the problem than the input format.
Can you post more of the stacktrace? This would help to identify the spot
in the Flink code where the exception is thrown.

Thanks, Fabian

2017-09-18 18:42 GMT+02:00 Garrett Barton :

> Hey all,
>
>  I am trying out a POC with flink on yarn.  My simple goal is to read from
> a Hive ORC table, process some data and write to a new Hive ORC table.
>
> Currently I can get Flink to read the source table fine, both with using
> The HCatalog Input format directly, and by using the flink-hcatalog
> wrapper.  Processing the data also works fine. Dumping to console or a text
> file also works fine.
>
> I'm now stuck trying to write the data out, I'm getting
> ClassNotFoundExceptions:
>
> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io
> .orc.OrcInputFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.<
> init>(FosterStorageHandler.68)
> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
> HCatUtil.java:404)
>
> Since I read from an Orc table, I know I have that class in my classpath.
> So I'm wondering if each stage/step in a flink process has some kind of
> special classloader that I am not aware of?  (also its odd that it wants
> the inputformat and not the outputformat, not sure why yet)
>
> My output code looks like this:
>
>
> Job job = Job.getInstance(conf);
>
> HCatOutputFormat.setOutput(job, OutputJobInfo.create("schema",
> "table",null));
> HCatSchema outSchema = HCatOutputFormat.getTableSchem
> a(job.getConfiguration());
> HCatOutputFormat.setSchema(job.getConfiguration(), outSchema);
>
> HCatOutputFormat outputFormat = new HCatOutputFormat();
>
> HadoopOutputFormat out = new
> HadoopOutputFormat(outputFormat, job);
>
> // from previous processing step
> hcat.output(out);
> env.execute("run");
>
>
>
> One other thing to note, I had to put 
> flink-hadoop-compatability_2.11-1.3.2.jar
> into the lib folder of the flink distro.  Building my code in a shaded jar
> with that dependency did not work for me.  However when I put the hive/hcat
> jars in the lib folder it caused lots of other errors.  Since the shading
> didn't work for the hadoop-compatability jar it makes me think there is
> some funky class loader stuff going on.  I don't understand why this doesnt
> work.  The orc code is shaded and verified in my jar, the classes are
> present, plus I successfully read from an ORC table.
>
> Any help or explanation into how the classpath/classloading works would be
> wonderful!
>


Flink kafka consumer that read from two partitions in local mode

2017-09-19 Thread Sofer, Tovi
Hi,

I am trying to setup FlinkKafkaConsumer which reads from two partitions in 
local mode, using  setParallelism=2.
The producer writes to two partition (as it is shown in metrics report).
But the consumer seems to read always from one partition only.
Am I missing something in partition configuration?

Code:


Producer setup:
Configuration localConfig = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(parallelism, localConfig);

env.setParallelism(2);

String kafkaPort = 
parameters.get(SimulatorConfig.ParamsEnum.KAFKA_PORT.fullName());

SingleOutputStreamOperator fixMsgSource = 
env.addSource(srcMsgProvider.getFixMsgSource(), 
TypeInformation.of(String.class)).name(sourceGenerationType.getValue());
fixMsgSource.addSink(new FlinkKafkaProducer010<>("localhost:"  + kafkaPort, 
TOPIC_NAME, new SimpleStringSchema()))

.name("fix_topic");

env.execute("MsgSimulatorJob");


Consumer setup:

String topicName = "fix";
Configuration conf = new Configuration();
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);

env.setParallelism(2);
env.getConfig().setGlobalJobParameters(configParams); // make parameters 
available in the web interface
DeserializationSchema> deserializationSchema = new 
SimpleStringAndTimestampDeserializationSchema ();
FlinkKafkaConsumer010> kafkaConsumer = new 
FlinkKafkaConsumer010<>(topicName, deserializationSchema, 
kafkaParams.getProperties());

DataStream> fixMessagesStream = 
env.addSource(kafkaConsumer).name("fix_topic").setParallelism(2);

As you can see in output, only 1 consumer partition seems to be used:
Producer output:
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.1.numRecordsInPerSecond: 19836.0333
2017-09-19 14:40:45,818 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Sink: 
fix_topic.0.numRecordsInPerSecond: 20337.9334
2017-09-19 14:40:45,819 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.ca94a7e34623274bd560c7115f842acf.MsgSimulatorJob.Source: 
random -> Sink: fix_topic.0.numRecordsInPerSecond: 0.0
Consumer output:
2017-09-19 14:40:45,116 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.select-rate: 1928.1421153709368
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.commit-rate: 0.21623491761449637
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.outgoing-byte-rate: 982.0051413881748
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.sync-rate: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.KafkaConsumer.io-ratio: 0.01148712465103046
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numRecordsOutPerSecond: 6625.2666
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.1.numBytesOutPerSecond: 1.40222884E7
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numBytesInRemotePerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numRecordsInPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numBytesOutPerSecond: 10.5
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskmanager.7e87ec82a8abb8ee76e43ad29f682291.Flink Streaming 
Job.Source: fix_topic.0.numBytesInLocalPerSecond: 0.0
2017-09-19 14:40:45,117 INFO  - [Flink-MetricRegistry-1] 
127.0.0.1.taskma

Re: How to use operator list state like a HashMap?

2017-09-19 Thread Fabian Hueske
Hi Tony,

operator state can only be kept on the heap.

One thing you could try is to add a random value to you data and keyBy on a
composite key that consists of your original key and the random value.
It is important though, that you actually add the random value to your data
to ensure that the extracted key is always the same, i.e., deterministic
with respect to the data.
This should evenly distribute your data and allow you to use keyed MapState.

Hope this helps,
Fabian

2017-09-19 15:58 GMT+02:00 Tony Wei :

> Hi,
>
> I have a basic streaming job that continuously persist data from Kafka to
> S3.
> Those data would be grouped by some dimensions and a limited amount.
>
> Originally, I used 'keyBy' and key state to fulfill the requirement.
> However, because the data is extremely skewed, I turned to use map
> function to aggregate data for some partitions only, so that I can balance
> the amount of data in each sub tasks.
>
> I used a HashMap to store data by different dimensions inner map function
> and convert it to operator list state when 'snapshot()' is called.
> But, that makes another problem. Because I can't access operator list
> state directly like using key state in KeyedStream, I have to use heap
> space to store those state. It leads to the limitation of the amount that I
> can cache in map function.
>
> I was wondering if there is any good suggestion to deal with this problem
> or how to use operator list state like this scenario with a better manner.
> Thank you.
>
>
> Best Regards,
> Tony Wei
>


Re: Clean GlobalWidnow state

2017-09-19 Thread gerardg
The UUIDs are assigned. 

As far as I can see (inspecting the metrics and how the task behaves) the
mergeElements apply function receives all the elements (the main element and
the other elements that it expects) so it seems that the correlation is
correct. Also, nothing indicates that there are elements lost inside the
window (everything that enters goes out).

Thanks,

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Noisy org.apache.flink.configuration.GlobalConfiguration

2017-09-19 Thread Ufuk Celebi
PS: To answer the question. No, I think there is no reason for this
and it shouldn't happen. :-(

On Tue, Sep 19, 2017 at 2:44 AM, Elias Levy  wrote:
> Is there a particular reason that GlobalConfiguration is so noisy?
>
> The task manager log is full of "Loading configuration property" messages
> from GlobalConfiguration each time there is a checkpoint.  Why isn't the
> configuration read once?


Re: Noisy org.apache.flink.configuration.GlobalConfiguration

2017-09-19 Thread Ufuk Celebi
I saw this too recently when using HadoopFileSystem for checkpoints
(HDFS or S3). I thought I had opened an issue for this, but I didn't.
Here it is: https://issues.apache.org/jira/browse/FLINK-7643


On Tue, Sep 19, 2017 at 1:28 PM, Till Rohrmann  wrote:
> Hi Elias,
>
> which version of Flink and which state backend are you running? I tried to
> reproduce it and wasn't successful so far.
>
> We recently changed a bit how we load the GlobalConfiguration in combination
> with dynamic properties [1]. Maybe this has affected what you've reported as
> well.
>
> [1] https://issues.apache.org/jira/browse/FLINK-7269
>
> Cheers,
> Till
>
> On Tue, Sep 19, 2017 at 2:44 AM, Elias Levy 
> wrote:
>>
>> Is there a particular reason that GlobalConfiguration is so noisy?
>>
>> The task manager log is full of "Loading configuration property" messages
>> from GlobalConfiguration each time there is a checkpoint.  Why isn't the
>> configuration read once?
>
>


How to use operator list state like a HashMap?

2017-09-19 Thread Tony Wei
Hi,

I have a basic streaming job that continuously persist data from Kafka to
S3.
Those data would be grouped by some dimensions and a limited amount.

Originally, I used 'keyBy' and key state to fulfill the requirement.
However, because the data is extremely skewed, I turned to use map function
to aggregate data for some partitions only, so that I can balance the
amount of data in each sub tasks.

I used a HashMap to store data by different dimensions inner map function
and convert it to operator list state when 'snapshot()' is called.
But, that makes another problem. Because I can't access operator list state
directly like using key state in KeyedStream, I have to use heap space to
store those state. It leads to the limitation of the amount that I can
cache in map function.

I was wondering if there is any good suggestion to deal with this problem
or how to use operator list state like this scenario with a better manner.
Thank you.


Best Regards,
Tony Wei


Re: Noisy org.apache.flink.configuration.GlobalConfiguration

2017-09-19 Thread Till Rohrmann
Hi Elias,

which version of Flink and which state backend are you running? I tried to
reproduce it and wasn't successful so far.

We recently changed a bit how we load the GlobalConfiguration in
combination with dynamic properties [1]. Maybe this has affected what
you've reported as well.

[1] https://issues.apache.org/jira/browse/FLINK-7269

Cheers,
Till

On Tue, Sep 19, 2017 at 2:44 AM, Elias Levy 
wrote:

> Is there a particular reason that GlobalConfiguration is so noisy?
>
> The task manager log is full of "Loading configuration property" messages
> from GlobalConfiguration each time there is a checkpoint.  Why isn't the
> configuration read once?
>


Re: [DISCUSS] Dropping Scala 2.10

2017-09-19 Thread Sean Owen
For the curious, here's the overall task in Spark:

https://issues.apache.org/jira/browse/SPARK-14220

and  most of the code-related changes:

https://github.com/apache/spark/pull/18645

and where it's stuck at the moment:

https://mail-archives.apache.org/mod_mbox/spark-dev/201709.mbox/%3CCAMAsSdKe7Os80mX7jYaD2vNWLGWioBgCb4GG55eaN_iotFxZvw%40mail.gmail.com%3E



On Tue, Sep 19, 2017 at 11:07 AM Márton Balassi 
wrote:

> Hi Aljoscha,
>
> I am in favor of the change. No concerns on my side, just one remark that
> I have talked to Sean last week (ccd) and he mentioned that he has faced
> some technical issues while driving the transition from 2.10 to 2.12 for
> Spark. It had to do with changes in the scope of implicits. You might end
> up hitting the same.
>
> Best,
> Marton
>
> On Tue, Sep 19, 2017 at 11:56 AM, Aljoscha Krettek 
> wrote:
>
>> Hi,
>>
>> Talking to some people I get the impression that Scala 2.10 is quite
>> outdated by now. I would like to drop support for Scala 2.10 and my main
>> motivation is that this would allow us to drop our custom Flakka build of
>> Akka that we use because newer Akka versions only support Scala 2.11/2.12
>> and we need a backported feature.
>>
>> Are there any concerns about this?
>>
>> Best,
>> Aljoscha
>
>
>


Re: Clean GlobalWidnow state

2017-09-19 Thread Aljoscha Krettek
Hi,

Are the UUIDs randomly generated when calling .uuid or are they assigned and 
then .uuid will return the same UUID when calling multiple times? The latter 
would be problematic because we would not correctly assign state.

Best,
Aljoscha
> On 19. Sep 2017, at 11:41, Fabian Hueske  wrote:
> 
> If this would be the case, that would be a bug in Flink.
> As I said before, your implementation looked good to me.
> All state of window and trigger should be wiped if the trigger returns 
> FIRE_AND_PURGE (or PURGE) and it's clean() method is correctly implemented. 
> 
> I'll CC Aljoscha again for his opinion. 
> We might need to file a JIRA for the issue.
> 
> Thanks, 
> Fabian
> 
> 2017-09-19 11:32 GMT+02:00 gerardg  >:
> Thanks Fabian, I'll take a look to these improvements.
> 
> I was wondering if the increasing state size could be due to that the UUID
> used in the keyBy are randomly generated. Maybe even if I correctly delete
> all the state related to a given key there is still some metadata related to
> the key wandering around.
> 
> Gerard
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
> 
> 



Re: Load distribution through the cluster

2017-09-19 Thread Fabian Hueske
There is no notion of "full" in Flink except that one slot will run at most
one subtask of each operator.

The scheduling depends on the structure of the job, the parallelism of the
operators, and the number of slots per TM.
It's hard to tell without knowing the details.

2017-09-19 11:57 GMT+02:00 AndreaKinn :

> So Flink use the other nodes just if one is completely "full" ?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: [DISCUSS] Dropping Scala 2.10

2017-09-19 Thread Márton Balassi
Hi Aljoscha,

I am in favor of the change. No concerns on my side, just one remark that I
have talked to Sean last week (ccd) and he mentioned that he has faced some
technical issues while driving the transition from 2.10 to 2.12 for Spark.
It had to do with changes in the scope of implicits. You might end up
hitting the same.

Best,
Marton

On Tue, Sep 19, 2017 at 11:56 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> Talking to some people I get the impression that Scala 2.10 is quite
> outdated by now. I would like to drop support for Scala 2.10 and my main
> motivation is that this would allow us to drop our custom Flakka build of
> Akka that we use because newer Akka versions only support Scala 2.11/2.12
> and we need a backported feature.
>
> Are there any concerns about this?
>
> Best,
> Aljoscha


Re: [DISCUSS] Dropping Scala 2.10

2017-09-19 Thread Till Rohrmann
Given that the last maintenance release Scala 2.10.6 is from about 2 years
ago, I would also be in favour of dropping Scala 2.10 support from Flink.
This will make maintenance easier for us and allow us to drop artifacts
like Flakka.

Cheers,
Till

On Tue, Sep 19, 2017 at 11:56 AM, Aljoscha Krettek 
wrote:

> Hi,
>
> Talking to some people I get the impression that Scala 2.10 is quite
> outdated by now. I would like to drop support for Scala 2.10 and my main
> motivation is that this would allow us to drop our custom Flakka build of
> Akka that we use because newer Akka versions only support Scala 2.11/2.12
> and we need a backported feature.
>
> Are there any concerns about this?
>
> Best,
> Aljoscha


Re: NoResourceAvailable exception

2017-09-19 Thread AndreaKinn
Thank you, unfortunately it had no effects.

As I add more load on the computation appears the error taskmanager killed
on the node on use, without calling other nodes to sustain the computation. 
I also increased 

akka.watch.heartbeat.interval
akka.watch.heartbeat.pause
akka.transport.heartbeat.interval
akka.transport.heartbeat.pause

obtaining just a (very ) delayed error.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: heap dump shows StoppableSourceStreamTask retained by java.lang.finalizer

2017-09-19 Thread Stephan Ewen
Hi!

>From my understanding, overriding finalize() still has some use cases and
is valid if done correctly, (although PhantomReference has more control
over the cleanup process). finalize() is still used in JDK classes as well.

Whenever one overrides finalize(), the object cannot be immediately garbage
collected because the finalize() method may make it reachable again. It
results in the following life cycle:

  1) object becomes unreachable, is detected eligible for GC
  2) In the GC cycle, object is NOT collected, but finalize() is called
  3) If object is still not reachable, it will be collected in the
subsequent GC cycle

In essence, objects that override finalize() stay for one more GC cycle.
That may be what you are seeing. It should not be a real memory leak, but
deferred memory release.

Is this a problem that is affecting the system, or only something that
seems odd for now?

If you are very concerned about this, would you be up to contribute a
change that uses a PhantomReference and Reference Queue for cleanup instead?

Stephan


On Tue, Sep 19, 2017 at 12:56 AM, Till Rohrmann 
wrote:

> Hi Steven,
>
> the finalize method in StreamTask acts as a safety net in case the
> services of the StreamTask haven't been properly shut down. In the code,
> however, it looks as if the TimerService, for example, is always being
> stopped in the finally block of the invoke method. Thus, it might not be
> necessary to have the finalize method as a safety net.
>
> How did you kill the TaskManagers? I assume you didn't kill the JVM
> process because otherwise you wouldn't see the finalizer objects piling up.
>
> I think that you can create a JIRA issue for removing the finalizer method.
>
> Cheers,
> Till
>
>
>
> On Thu, Sep 14, 2017 at 12:26 PM, Fabian Hueske  wrote:
>
>> Hi Steven,
>>
>> thanks for reporting this issue.
>> Looping in Till who's more familiar with the task lifecycles.
>>
>> Thanks, Fabian
>>
>> 2017-09-12 7:08 GMT+02:00 Steven Wu :
>>
>>> Hi ,
>>>
>>> I was using Chaos Monkey to test Flink's behavior against frequent
>>> killing of task manager nodes. I found that stopped/disposed StreamTask got
>>> retained by java finalizer. It is kind like a memory leak. Since each
>>> StreamTask retains 2.6 MB memory. With 20 kills (and job restarts) for
>>> 8-CPU container, there are 2.6 * 20 * 8 MB retained in heap.
>>>
>>> [image: Inline image 1]
>>>
>>> finalize() is generally not recommended for cleanup, because "*Finalizers
>>> are unpredictable, often dangerous, and generally unnecessary*", quoted
>>> from Joshua Bloch's book.
>>> http://www.informit.com/articles/article.aspx?p=1216151&seqNum=7
>>>
>>> This code from StreamTask.java seems to be the cause. Is it necessary?
>>> can it be removed? We are using flink-1.2 release branch. But I see the
>>> same code in flink-1.3 and master branch
>>>
>>> /**
>>> * The finalize method shuts down the timer. This is a fail-safe
>>> shutdown, in case the original
>>> * shutdown method was never called.
>>> *
>>> * 
>>> * This should not be relied upon! It will cause shutdown to happen much
>>> later than if manual
>>> * shutdown is attempted, and cause threads to linger for longer than
>>> needed.
>>> */
>>> @Override
>>> protected void finalize() throws Throwable {
>>> super.finalize();
>>> if (timerService != null) {
>>> if (!timerService.isTerminated()) {
>>> LOG.info("Timer service is shutting down.");
>>> timerService.shutdownService();
>>> }
>>> }
>>>
>>> cancelables.close();
>>> }
>>>
>>> Thanks,
>>> Steven
>>>
>>
>>
>


Re: Load distribution through the cluster

2017-09-19 Thread AndreaKinn
So Flink use the other nodes just if one is completely "full" ?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink SocketTextStream source scheduled to a single machine

2017-09-19 Thread Till Rohrmann
Hi Le Xu,

the reason why all different SocketTextStreamFunction sources are scheduled
to the same machine is because of slot sharing. Slot sharing allows Flink
to schedule tasks belonging to different operators into the same slot. This
allows, for example, to achieve better colocation between tasks which
depend on each other (e.g. build-side, probe-side and actual join operator
running in the same slot). Moreover, it makes it easier to reason about how
many slots your application needs, which is the maximum parallelism of your
job.

However, the downside is that independent components of your job won't be
spread across the cluster but usually end up in the same slot(s)
(consequently on the same machine, too) due to slot sharing.

You can disable slot sharing for parts of your job if you set explicitly a
different slot sharing group name. Then only operators which are assigned
to the same slot sharing group are subject to slot sharing. Down stream
operators inherit the slot sharing group from their inputs. Thus, if you
have an embarrassingly parallel job, then it suffices to only the set the
slot sharing group at the sources.

for(int i =0; i< hosts.length; i++) {
DataStream someStream = env
   .socketTextStream(hosts[i], ports[i])
   .slotSharingGroup("socket_" + i);

DataStream> joinedAdImpressions =
rawMessageStream.rebalance() ...
}

Cheers,
Till

On Mon, Sep 18, 2017 at 10:09 PM, Le Xu  wrote:

> Hello!
>
> I'm trying to figure out how it happens: I'm having a program reading from
> multiple socketTextStream and these text streams feed into different data
> flow (and these data streams never connect in my job). It looks something
> similar to below:
>
> for(int i =0; i< hosts.length; i++) {
>
> DataStream someStream = env.socketTextStream(hosts[i],
> ports[i]);
> DataStream> joinedAdImpressions =
> rawMessageStream.rebalance() ...
>
> However, when I run the job on a cluster I found that all source task have
> been scheduled to one machine so the machine becomes a severe bottleneck
> for the performance. Any ideas how would this happen?
>
> Thanks!
>


[DISCUSS] Dropping Scala 2.10

2017-09-19 Thread Aljoscha Krettek
Hi,

Talking to some people I get the impression that Scala 2.10 is quite outdated 
by now. I would like to drop support for Scala 2.10 and my main motivation is 
that this would allow us to drop our custom Flakka build of Akka that we use 
because newer Akka versions only support Scala 2.11/2.12 and we need a 
backported feature.

Are there any concerns about this?

Best,
Aljoscha

Re: Clean GlobalWidnow state

2017-09-19 Thread Fabian Hueske
If this would be the case, that would be a bug in Flink.
As I said before, your implementation looked good to me.
All state of window and trigger should be wiped if the trigger returns
FIRE_AND_PURGE (or PURGE) and it's clean() method is correctly implemented.

I'll CC Aljoscha again for his opinion.
We might need to file a JIRA for the issue.

Thanks,
Fabian

2017-09-19 11:32 GMT+02:00 gerardg :

> Thanks Fabian, I'll take a look to these improvements.
>
> I was wondering if the increasing state size could be due to that the UUID
> used in the keyBy are randomly generated. Maybe even if I correctly delete
> all the state related to a given key there is still some metadata related
> to
> the key wandering around.
>
> Gerard
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Custom Serializers

2017-09-19 Thread nragon
createInstance(Object[] fields) at TupleSerializerBase seems not to be part
of TypeSerializer API.
Will I be loosing any functionality? In what cases do you use this instead
of createInstance()?

// We use this in the Aggregate and Distinct Operators to create instances
// of immutable Tuples (i.e. Scala Tuples)

Thanks



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Clean GlobalWidnow state

2017-09-19 Thread gerardg
Thanks Fabian, I'll take a look to these improvements.

I was wondering if the increasing state size could be due to that the UUID
used in the keyBy are randomly generated. Maybe even if I correctly delete
all the state related to a given key there is still some metadata related to
the key wandering around. 

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Load distribution through the cluster

2017-09-19 Thread Fabian Hueske
Hi,

Flink's scheduling aims to co-located tasks to reduce network communication
and ease the reasoning about resource/slot consumption.
A slot can execute one subtask of each operator of a program, i.e, a
parallel slice of the program.

You can control the scheduling of tasks by specifying resource groups. [1]
[2]

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/concepts/runtime.html#task-slots-and-resources
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#task-chaining-and-resource-groups

2017-09-18 15:22 GMT+02:00 AndreaKinn :

> Hi,
> I'm experimenting a bit with the cluster.
> I didn't set any options about sharing slots and chains hoping that Flink
> decided autonomously how to balance the load through the nodes of the
> cluster. My cluster is composed by one job and task manager and two task
> manager.
>
> I noted that every time I start the program, just one node is busy (at >
> 95%
> for each cpu core) while the other nodes are completely free (< 3%). Same
> arguments for the memory.
>
> So Flink doesn't balance the work on the nodes??
> I expected something like the cpu usage was distributed on every nodes.
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-19 Thread Emily McMahon
I've tried every combination I can think of to pass an s3 path as the
target directory (url encode, include trailing slash, etc)

I can successfully pass a local path as the target directory (ie
/jobs/$jobID/cancel-with-savepoint/target-directory/tmp) so I don't think
there's a problem with the jobId or rest of the url. I also verified I can
create the savepoint on s3 from the command line so it's not a permission
issue.

Here's the same question on stack overflow

(with
the exception that they are getting a 502 whereas I'm getting a 404)

using Flink 1.3.1

Anyone have a working example?

Thanks,
Emily


Re: Securing Flink Monitoring REST API

2017-09-19 Thread Fabian Hueske
Thanks for the correction and the pointers Eron!

Cheers, Fabian

2017-09-18 18:34 GMT+02:00 Eron Wright :

> Unfortunately Flink does not yet support SSL mutual authentication nor any
> form of client authentication.   There is an ongoing discussion about it:
> http://apache-flink-mailing-list-archive.1008284.n3.
> nabble.com/DISCUSS-Service-Authorization-redux-td18890.html
>
> A workaround that I've seen is to use nginx as a frontend proxy.  Be sure
> to lock down the underlying endpoints somehow.  If you choose to go this
> route, Patrick Lucas gave a related talk recently (Flink in Containerland):
> https://youtu.be/w721NI-mtAA
>
> -Eron
>
>
> On Mon, Sep 18, 2017 at 1:30 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> sorry for the late response.
>> Flink uses Netty for network communication which supports SSL client
>> authentication.
>> I haven't tried it myself, but would think that this should work in Flink
>> as well if you configure the certificates correctly.
>>
>> We should update the docs to cover this aspect.
>> Feedback on this would be very welcome
>>
>> Thanks, Fabian
>>
>> 2017-09-06 14:23 GMT+02:00 avivros :
>>
>>> Does  jobmanager.web.ssl.enabled supports Client SSL Authentication?
>>>
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nab
>>> ble.com/
>>>
>>
>>
>


Re: Clean GlobalWidnow state

2017-09-19 Thread Fabian Hueske
Hi Gerard,

I had a look at your Trigger implementation but did not spot something
suspicious that would cause the state size to grow.
However, I notices a few things that can be improved:

- use ctx.getCurrentProcessingTime instead of System.currentTimeMillis to
make the Trigger easier to test (there some test harnesses that can set the
processing time manually)
- timers are not overwritten, so each timeout timer will yield a callback
to onProcessingTime(). It is not possible to delete timers (so you cannot
prevent the onProcessingTime() method to be called multiple times), but you
can save the most recent timer timestamps as ValueState[Long] and compare
against the state to only act on the last timer call.
- You can get the state objects just once and apply multiple operations on
the state object, i.e.,

var elementsToReceive = ctx.getPartitionedState(elementsToReceiveDesc)
var elementsReceived = ctx.getPartitionedState(elementsReceivedDesc)

elementsToReceive.update(x)
val cnt: Int = elementsToReceive.get()
...

Maybe Aljoscha can check the code as well and see if he finds the reason
why the state grows.

Best, Fabian

2017-09-18 15:27 GMT+02:00 gerardg :

> I may be able to better know what is happening if I could get what is being
> stored in the state. Is there any way to read the RocksDB db state?
>
> Gerard
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: Custom Serializers

2017-09-19 Thread Chesnay Schepler
Have a look at the TupleTypeInfo class. It has a constructor that 
accepts an array of TypeInformation,

and supports automatically generating a serializer from them.

On 18.09.2017 18:28, nragon wrote:

One other thing :). Can i set tuple generic type dynamically?
Meaning, build a tuple of N arity and build TupleSerializer based on those
types.
This because I'll only know these types based on user inputs.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/