Re: Load distribution through the cluster

2017-09-20 Thread Chesnay Schepler

It should only apply to the map operator.

On 19.09.2017 17:38, AndreaKinn wrote:

If I apply a sharing slot as in the example:

DataStream LTzAccStream = env
.addSource(new FlinkKafkaConsumer010<>("topic", 
new
CustomDeserializer(), properties))
.assignTimestampsAndWatermarks(new 
CustomTimestampExtractor())
.map(new MapFunction, 
Event>(){
   @Override
public Event map(Tuple2 
value) throws Exception {
return new Event(value.f0, 
value.f1);
}
}).slotSharingGroup("group1");

just the map operator is assigned to the shared slot or it happens for the
entire block (addSource + assignTimestamp + map)?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: got Warn message - "the expected leader session ID did not equal the received leader session ID " when using LocalFlinkMiniCluster to interpret scala code

2017-09-20 Thread Till Rohrmann
Hi XiangWei,

programmatically there is no nice tooling yet to cancel jobs on a dedicated
cluster. What you can do is to use Flink's REST API to issue a cancel
command [1]. You have to send a GET request to the target URL
`/jobs/:jobid/cancel`. In the future we will improve the programmatic job
control which will allow you to do these kind of things more easily.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/rest_api.html#job-cancellation

Cheers,
Till

On Wed, Sep 20, 2017 at 5:46 AM, XiangWei Huang 
wrote:

> Hi Till,
>
>  Thanks for your answer,it worked when i use *StandaloneMiniCluster,*
> but another problem is that i can’t find a way to cancel
> a running Flink job without shutting down the cluster,for
> LocalFlinkMiniCluster i can do  it with below code :
>
> *   for (job <- cluster.getCurrentlyRunningJobsJava()) {*
>
> *  cluster.stopJob(job)   }*
>
>Is it possible to cancel a running Flink job without shutting down a 
> *StandaloneMiniCluster
> ?*
>
> Best Regards,
> XiangWei
>
>
>
> 在 2017年9月14日,16:58,Till Rohrmann  写道:
>
> Hi XiangWei,
>
> the problem is that the LocalFlinkMiniCluster can no longer be used in
> combination with a RemoteExecutionEnvironment. The reason is that the
> LocalFlinkMiniCluster uses now an internal leader election service and
> assigns leader ids to its components. Since this is an internal service it
> is not possible to retrieve this information like it is the case with the
> ZooKeeper based leader election services.
>
> Long story short, the Flink Scala shell currently does not work with a
> LocalFlinkMiniCluster and would have to be fixed to work properly
> together with a local execution environment. Until then, I recommend
> starting a local standalone cluster and let the code run there.
>
> Cheers,
> Till
> ​
>
> On Wed, Sep 13, 2017 at 6:21 AM, XiangWei Huang 
> wrote:
>
>> dear all,
>>
>> *Below is the code i execute:*
>>
>> import java.io._
>> import java.net.{URL, URLClassLoader}
>> import java.nio.charset.Charset
>> import java.util.Collections
>> import java.util.concurrent.atomic.AtomicBoolean
>>
>> import com.netease.atom.common.util.logging.Logging
>> import com.netease.atom.interpreter.Code.Code
>> import com.netease.atom.interpreter.{Code, Interpreter, Inte
>> rpreterResult, InterpreterUtils}
>> import io.netty.buffer._
>> import org.apache.flink.api.scala.FlinkILoop
>> import org.apache.flink.client.CliFrontend
>> import org.apache.flink.client.cli.CliFrontendParser
>> import org.apache.flink.client.program.ClusterClient
>> import org.apache.flink.configuration.{QueryableStateOptions
>> , Configuration, ConfigConstants, GlobalConfiguration}
>> import org.apache.flink.runtime.akka.AkkaUtils
>> import org.apache.flink.runtime.minicluster.{StandaloneMiniC
>> luster, LocalFlinkMiniCluster}
>>
>> import scala.Console
>> import scala.beans.BeanProperty
>> import scala.collection.JavaConversions._
>> import scala.collection.mutable
>> import scala.collection.mutable.{ArrayBuffer, ListBuffer}
>> import scala.runtime.AbstractFunction0
>> import scala.tools.nsc.Settings
>> import scala.tools.nsc.interpreter.{IMain, JPrintWriter, Results}
>>
>> class FlinkInterpreter extends Interpreter {
>>   private var bufferedReader: Option[BufferedReader] = None
>>   private var jprintWriter: JPrintWriter = _
>>   private val config = new Configuration;
>>   private var cluster: LocalFlinkMiniCluster = _
>>   @BeanProperty var imain: IMain = _
>>   @BeanProperty var flinkILoop: FlinkILoop = _
>>   private var out: ByteBufOutputStream = null
>>   private var outBuf: ByteBuf = null
>>   private var in: ByteBufInputStream = _
>>   private var isRunning: AtomicBoolean = new AtomicBoolean(false)
>>
>>   override def isOpen: Boolean = {
>> isRunning.get()
>>   }
>>
>>   def startLocalMiniCluster(): (String, Int, LocalFlinkMiniCluster) = {
>> config.toMap.toMap.foreach(println)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1)
>> config.setInteger(ConfigConstants.LOCAL_NUMBER_RESOURCE_MANAGER, 1)
>> config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0)
>> config.setBoolean(QueryableStateOptions.SERVER_ENABLE.key(), true)
>> config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true)
>> val localCluster = new LocalFlinkMiniCluster(config, false)
>> localCluster.start(true)
>> val port = AkkaUtils.getAddress(localCluster.jobManagerActorSystems.
>> get.head).port
>> println(s"Starting local Flink cluster (host:
>> localhost,port: ${localCluster.getLeaderRPCPort}).\n")
>> ("localhost", localCluster.getLeaderRPCPort, localCluster)
>>   }
>>
>>
>>   /**
>>* Start flink cluster and create interpreter
>>*/
>>   override def open: Unit = {
>> outBuf = ByteBufAllocator.DEFAULT.heapBuffer(20480)
>> out = new ByteBufOutputStream(outBuf)
>> in = new ByteBufInputStream(outBuf)
>> //   

Re: [DISCUSS] Dropping Scala 2.10

2017-09-20 Thread Hai Zhou
+1

> 在 2017年9月19日,17:56,Aljoscha Krettek  写道:
> 
> Hi,
> 
> Talking to some people I get the impression that Scala 2.10 is quite outdated 
> by now. I would like to drop support for Scala 2.10 and my main motivation is 
> that this would allow us to drop our custom Flakka build of Akka that we use 
> because newer Akka versions only support Scala 2.11/2.12 and we need a 
> backported feature.
> 
> Are there any concerns about this?
> 
> Best,
> Aljoscha



Re: Problem in Flink 1.3.2 with Mesos task managers offers

2017-09-20 Thread Francisco Gonzalez Barea
Hello Eron,

Thank you for your reply, we will take a look at this.

Regards


On 19 Sep 2017, at 22:37, Eron Wright 
mailto:eronwri...@gmail.com>> wrote:

Hello, the current behavior is that Flink holds onto received offers for up to 
two minutes while it attempts to provision the TMs.   Flink can combine small 
offers to form a single TM, to combat fragmentation that develops over time in 
a Mesos cluster.   Are you saying that unused offers aren't being released 
after two minutes?

There's a log entry you should see in the JM log whenever an offer is released:
LOG.info(s"Declined offer ${lease.getId} from 
${lease.hostname()} "
  + s"of ${lease.memoryMB()} MB, ${lease.cpuCores()} cpus.")

The timeout value isn't configurable at the moment, but if you're willing to 
experiment by building Flink from source, you may adjust the two minute timeout 
to something lower as follows.   In the `MesosFlinkResourceManager` class, edit 
the `createOptimizer` method to call `withLeaseOfferExpirySecs` on the 
`TaskScheduler.Builder` object.

Let us know if that helps and we'll make the timeout configurable.
-Eron

On Tue, Sep 19, 2017 at 8:58 AM, Francisco Gonzalez Barea 
mailto:francisco.gonza...@piksel.com>> wrote:
Hello guys,

We have a flink 1.3.2 session deployed from Marathon json to Mesos with some of 
the following parameters as environment variables:


"flink_mesos.initial-tasks": "8",
"flink_mesos.resourcemanager.tasks.mem": "4096",

And other environment variables including zookeeper, etc.

The mesos cluster is used for diferents applications (kafka, ad-hoc...), and 
have fragmentation into the agents. Our problem is that the flink session is 
getting all offers, even small ones. In case there are not enough offers to 
suit that configuration, it gets all of them, so there are no resources and 
offers free for other applications.

So the question would be what is the right configuration in these cases to 
avoid using all resources for the same flink session.

Thanks in advance.
Regards

This message is private and confidential. If you have received this message in 
error, please notify the sender or 
serviced...@piksel.com and remove it from your 
system.

Piksel Inc is a company registered in the United States, 2100 Powers Ferry Road 
SE, Suite 400, Atlanta, GA 
30339




Re: Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-20 Thread Nico Kruber
Hi Emily,
I'm not familiar with the details of the REST API either but if this is a 
problem with the proxy, maybe it is already interpreting the encoded URL and 
passes it on un-encoded - have you tried encoding the path again? That is, 
encoding the percent-signs:

http://
{ip}:20888/proxy/application_1504649135200_0001/jobs/
1a0fd176ec8aabb9b8464fa481f755f0/cancel-with-savepoint/target-directory/
s3%253A%252F%252F%252Fremit-flink


Nico

On Wednesday, 20 September 2017 00:52:05 CEST Emily McMahon wrote:
> Thanks Eron & Fabian.
> 
> The issue was hitting a yarn proxy url vs the node itself. For example this
> worked
> http://
> {ip}:37716/jobs/1a0fd176ec8aabb9b8464fa481f755f0/cancel-with-savepoint/targe
> t-directory/s3%3A%2F%2F%2Fremit-flink
> 
> But this did not
> http://
> {ip}:20888/proxy/application_1504649135200_0001/jobs/1a0fd176ec8aabb9b8464fa
> 481f755f0/cancel-with-savepoint/target-directory/s3%3A%2F%2F%2Fremit-flink
> 
> It's a bit confusing because the cancel api works with either and the proxy
> url sometimes works as this was successful http://
> {ip}:20888/proxy/application_1504649135200_0001/jobs/cca2dd609c716a7b0a19570
> 0777e5b1f/cancel-with-savepoint/target-directory/tmp/
> 
> Cheers,
> Emily
> 
> On Tue, Sep 19, 2017 at 2:37 PM, Eron Wright  wrote:
> > Good news, it can be done if you carefully encode the target directory
> > with percent-encoding, as per:
> > https://tools.ietf.org/html/rfc3986#section-2.1
> > 
> > For example, given the directory `s3:///savepoint-bucket/my-awesome-job`,
> > which encodes to `s3%3A%2F%2F%2Fsavepoint-bucket%2Fmy-awesome-job`, I was
> > able to submit the following URL:
> > http://localhost:8081/jobs/5c360ded6e4b7d8db103e71d68b7c8
> > 3d/cancel-with-savepoint/target-directory/s3%3A%2F%2F%
> > 2Fsavepoint-bucket%2Fmy-awesome-job
> > 
> > And see the following in the log:
> > 2017-09-19 14:27:45,939 INFO 
> > org.apache.flink.runtime.jobmanager.JobManager> 
> >- Trying to cancel job 5c360ded6e4b7d8db103e71d68b7c83d
> > 
> > with savepoint to s3:///savepoint-bucket/my-awesome-job
> > 
> > -Eron
> > 
> > On Tue, Sep 19, 2017 at 1:54 PM, Fabian Hueske  wrote:
> >> Hi Emily,
> >> 
> >> thanks for reaching out.
> >> I'm not familiar with the details of the Rest API but Ufuk (in CC) might
> >> be able to help you.
> >> 
> >> Best, Fabian
> >> 
> >> 2017-09-19 10:23 GMT+02:00 Emily McMahon :
> >>> I've tried every combination I can think of to pass an s3 path as the
> >>> target directory (url encode, include trailing slash, etc)
> >>> 
> >>> I can successfully pass a local path as the target directory (ie
> >>> /jobs/$jobID/cancel-with-savepoint/target-directory/tmp) so I don't
> >>> think there's a problem with the jobId or rest of the url. I also
> >>> verified
> >>> I can create the savepoint on s3 from the command line so it's not a
> >>> permission issue.
> >>> 
> >>> Here's the same question on stack overflow
> >>>  >>> ully-qualified-path-to-a-savepoint-directory-using-the> (with the
> >>> exception that they are getting a 502 whereas I'm getting a 404)
> >>> 
> >>> using Flink 1.3.1
> >>> 
> >>> Anyone have a working example?
> >>> 
> >>> Thanks,
> >>> Emily



signature.asc
Description: This is a digitally signed message part.


Re: Clean GlobalWidnow state

2017-09-20 Thread gerardg
I have prepared a repo that reproduces the issue: 
https://github.com/GerardGarcia/flink-global-window-growing-state

Maybe this way it is easier to spot the error or we can determine if it is a
bug.

Gerard



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Savepoints and migrating value state data types

2017-09-20 Thread mrooding
Hi

We've got a situation where we're merging several Kafka streams and for
certain streams, we want to retain up to 6 days of history. We're trying to
figure out how we can migrate savepoint data between application updates
when the data type for a certain state buffer updates.

Let's assume that we have 2 streams with the following data types:

case class A(id: String, name: String)
case class B1(id: String, price: Double)

We have a CoProcessFunction which combines the 2 streams and maintains 2
different buffer states:

MapState[String, A] and ValueState[B1]

In our scenario, we're trying to anticipate the data type of B1 changing in
the future. Let's assume that in the foreseeable future, B1 will change to:

case class B2(id: String, price: Double, date: String)

When we create a snapshot using B1 and then upgrading the application to B2
the obvious attempt would be to try and retrieve the stored ValueState and
the new ValueState:

val oldState = getRuntimeContext.getState(new
ValueStateDescriptor[B1]("1Buffer", createTypeInformation[B1]))
val newState = getRuntimeContext.getState(new
ValueStateDescriptor[B2]("2Buffer", createTypeInformation[B2]))

However, as soon as you do the following error occurs:

Unable to restore keyed state [aBuffer]. For memory-backed keyed state, the
previous serializer of the keyed state must be present; the serializer could
have been removed from the classpath, or its implementation have changed and
could not be loaded. This is a temporary restriction that will be fixed in
future versions.

Our assumption is that the process operator which has a specified ID which
Flink uses to save and restore savepoints. The CoProcessorFunction types
changed from CoProcessFunction[A, B1, A] to CoProcessFunction[A, B2, A] and
therefore the savepoint data does not apply to the operator anymore. Is this
assumption correct?

We've been going through the documentation and source code of Flink and it
seems like there's no achieve this kind of migrations. If this is the case,
we'd be interested in contributing to Flink to get this added a.s.a.p. and
would love to get some feedback on how to approach this.

Thanks in advance

Marc



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [DISCUSS] Dropping Scala 2.10

2017-09-20 Thread Ted Yu
+1
 Original message From: Hai Zhou  Date: 
9/20/17  12:44 AM  (GMT-08:00) To: Aljoscha Krettek , 
d...@flink.apache.org, user  Subject: Re: [DISCUSS] 
Dropping Scala 2.10 
+1

> 在 2017年9月19日,17:56,Aljoscha Krettek  写道:
> 
> Hi,
> 
> Talking to some people I get the impression that Scala 2.10 is quite outdated 
> by now. I would like to drop support for Scala 2.10 and my main motivation is 
> that this would allow us to drop our custom Flakka build of Akka that we use 
> because newer Akka versions only support Scala 2.11/2.12 and we need a 
> backported feature.
> 
> Are there any concerns about this?
> 
> Best,
> Aljoscha



Re: [DISCUSS] Dropping Scala 2.10

2017-09-20 Thread Federico D'Ambrosio
Hi, as far as I know some vendors like Hortonworks still use Kafka_2.10 as
part of their hadoop distribution.
Could the use of a different scala version cause issues with the Kafka
connector? I'm asking because we are using HDP 2.6 and we once already had
some issue with conflicting scala versions concerning Kafka (though, we
were using Storm, I still haven't tested the Flink connector in this
context).

Regards,
Federico

2017-09-20 14:19 GMT+02:00 Ted Yu :

> +1
>
>  Original message 
> From: Hai Zhou 
> Date: 9/20/17 12:44 AM (GMT-08:00)
> To: Aljoscha Krettek , d...@flink.apache.org, user <
> user@flink.apache.org>
> Subject: Re: [DISCUSS] Dropping Scala 2.10
>
> +1
>
> > 在 2017年9月19日,17:56,Aljoscha Krettek  写道:
> >
> > Hi,
> >
> > Talking to some people I get the impression that Scala 2.10 is quite
> outdated by now. I would like to drop support for Scala 2.10 and my main
> motivation is that this would allow us to drop our custom Flakka build of
> Akka that we use because newer Akka versions only support Scala 2.11/2.12
> and we need a backported feature.
> >
> > Are there any concerns about this?
> >
> > Best,
> > Aljoscha
>
>


-- 
Federico D'Ambrosio


Re: Empty directories left over from checkpointing

2017-09-20 Thread Stefan Richter
Hi,

We recently removed some cleanup code, because it involved checking some store 
meta data to check when we can delete a directory. For certain stores (like 
S3), requesting this meta data whenever we delete a file was so expensive that 
it could bring down the job because removing state could not be processed fast 
enough. We have a temporary fix in place now, so that jobs at large scale can 
still run reliably on stores like S3. Currently, this comes at the cost of not 
cleaning up directories but we are clearly planning to introduce a different 
mechanism for directory cleanup in the future that is not as fine grained as 
doing meta data queries per file delete. In the meantime, unfortunately the 
best way is to cleanup empty directories with some external tool.

Best,
Stefan

> Am 20.09.2017 um 01:23 schrieb Hao Sun :
> 
> Thanks Elias! Seems like there is no better answer than "do not care about 
> them now", or delete with a background job.
> 
> On Tue, Sep 19, 2017 at 4:11 PM Elias Levy  > wrote:
> There are a couple of related JIRAs:
> 
> https://issues.apache.org/jira/browse/FLINK-7587 
> 
> https://issues.apache.org/jira/browse/FLINK-7266 
> 
> 
> 
> On Tue, Sep 19, 2017 at 12:20 PM, Hao Sun  > wrote:
> Hi, I am using RocksDB and S3 as storage backend for my checkpoints.
> Can flink delete these empty directories automatically? Or I need a 
> background job to do the deletion?
> 
> I know this has been discussed before, but I could not get a concrete answer 
> for it yet. Thanks
> 
> 
> 



Re: Delay in Flink timers

2017-09-20 Thread Narendra Joshi
I have a couple of questions related to this:

1. We store state per key (Rocksdb backend). Currently, the state size
is ~1.5Gb. Checkpointing time sometimes reaches ~10-20 seconds. Is it
possible that checkpointing is affecting timer execution?
2. Does checkpointing cause Flink to stop consumption of data streams
(say from Kafka)? We have observed that when the timers are delayed,
there is delay in picking up messages from Kafka.
3. Are there any metrics exposed by Flink that could help us
understand better where the delay is coming from? Is there a metric
for knowing about contention between `processElement` and `onTimer`?
4. Is there a plan for moving from Scheduled Threadpool Executor to
using timing wheels for timeout?

If there is any other information that you need, please let me know.

On Tue, Sep 19, 2017 at 10:37 PM, Narendra Joshi  wrote:
> The number of timers is about 400 per second. We have observed that onTimer
> calls are delayed only when the number of scheduled timers starts increasing
> from a minima. It would be great if you can share pointers to code I can
> look at to understand it better. :)
>
> Narendra Joshi
>
> On 14 Sep 2017 16:04, "Aljoscha Krettek"  wrote:
>>
>> Hi,
>>
>> Yes, execution of these methods is protected by a synchronized block. This
>> is not a fair lock so incoming data might starve timer callbacks. What is
>> the number of timers we are talking about here?
>>
>> Best,
>> Aljoscha
>>
>> > On 11. Sep 2017, at 19:38, Chesnay Schepler  wrote:
>> >
>> > It is true that onTimer and processElement are never called at the same
>> > time.
>> >
>> > I'm not entirely sure whether there is any prioritization/fairness
>> > between these methods
>> > (if not if could be that onTimer is starved) , looping in Aljoscha who
>> > hopefully knows more
>> > about this.
>> >
>> > On 10.09.2017 09:31, Narendra Joshi wrote:
>> >> Hi,
>> >>
>> >> We are using Flink as a timer scheduler and delay in timer execution is
>> >> a huge problem for us. What we have experienced is that as the number
>> >> of
>> >> Timers we register increases the timers start getting delayed (for more
>> >> than 5 seconds). Can anyone point us in the right direction to figure
>> >> out what might be happening?
>> >>
>> >> I have been told that `onTimer` and `processElement` are called with a
>> >> mutually exclusive lock. Could this locking be the reason this is
>> >> happening? In both the functions there is no IO happening and it should
>> >> not take 5 seconds.
>> >>
>> >> Is it possible that calls to `processElement` starve `onTimer` calls?
>> >>
>> >>
>> >> --
>> >> Narendra Joshi
>> >>
>> >
>>
>


Re: Classpath/ClassLoader issues

2017-09-20 Thread Fabian Hueske
Hi Garrett,

I think I identified the problem.
You said you put the Hive/HCat dependencies into your user fat Jar,
correct? In this case, they are loaded with Flink's userClassLoader (as
described before).

In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly loads
the user classes with the user class loader.
However, when the HCatOutputFormat.getOutputCommitter() method is called,
Hive tries to load additional classes with the current thread class loader
(see at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
java:78)).
This behavior is actually OK, because we usually set the context
classloader to be the user classloader before calling user code. However,
this has not been done here.
So, this is in fact a bug.

I created this JIRA issue: https://issues.apache.org/jira/browse/FLINK-7656
and will open a PR for that.

Thanks for helping to diagnose the issue,
Fabian

2017-09-19 22:05 GMT+02:00 Garrett Barton :

> Fabian,
>
>  It looks like hive instantiates both input and output formats when doing
> either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler
> where it tries to load both.  It looks like its happening after the writes
> complete and flink is in the finish/finalize stage.  When I watch the
> counters in the Flink ui, i see all output tasks mark finished along with
> bytes sent and records sent being exactly what I expect them to be.  The
> first error also mentions the master, is this the flink jobmanager process
> then?
>
> The expanded stacktrace is:
>
> Caused by: java.lang.Exception: Failed to finalize execution on master
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> vertexFinished(ExecutionGraph.java:1325)
> at org.apache.flink.runtime.executiongraph.ExecutionVertex.
> executionFinished(ExecutionVertex.java:688)
> at org.apache.flink.runtime.executiongraph.Execution.
> markFinished(Execution.java:797)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> updateState(ExecutionGraph.java:1477)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply$
> mcV$sp(JobManager.scala:710)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
> at org.apache.flink.runtime.jobmanager.JobManager$$
> anonfun$handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
> ... 8 more
> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to load
> foster storage handler
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.
> finalizeGlobal(HadoopOutputFormatBase.java:202)
> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finalizeOnMaster(
> OutputFormatVertex.java:118)
> at org.apache.flink.runtime.executiongraph.ExecutionGraph.
> vertexFinished(ExecutionGraph.java:1320)
> ... 14 more
> Caused by: java.io.IOException: Failed to load foster storage handler
> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.
> java:409)
> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(HCatUtil.
> java:367)
> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.
> getOutputFormat(HCatBaseOutputFormat.java:77)
> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutputCommitter(
> HCatOutputFormat.java:275)
> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputFormatBase.
> finalizeGlobal(HadoopOutputFormatBase.java:200)
> ... 16 more
> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io
> .orc.OrcInputFormat
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler. t>(FosterStorageHandler.68)
> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
> CatUtil.java:404)
>
>
> Thank you all for any help. :)
>
> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske  wrote:
>
>> Hi Garrett,
>>
>> Flink distinguishes between two classloaders: 1) the system classloader
>> which is the main classloader of the process. This classloader loads all
>> jars in the ./lib folder and 2) the user classloader which loads the job
>> jar.
>> AFAIK, the different operators do not have distinct classloaders. So, in
>> principle all operators should use the same user classloader.
>>
>> According to the stacktrace you posted, the OrcInputFormat cannot be
>> found when you try to emit to an ORC file.
>> This looks suspicious because I would rather expect the OrcOutputFormat
>> to be the problem than the input format.
>> Can you post more of the stacktrace

Re: Rest API cancel-with-savepoint: 404s when passing path as target-directory

2017-09-20 Thread Eron Wright
It is not surprising to see fidelity issues with the YARN proxy.  I suggest
opening a ticket on Flink side to update the cancel-with-savepoint API to
take the target directory as a query string parameter (of course, backwards
compatibility should be maintained).

On Wed, Sep 20, 2017 at 1:55 AM, Nico Kruber  wrote:

> Hi Emily,
> I'm not familiar with the details of the REST API either but if this is a
> problem with the proxy, maybe it is already interpreting the encoded URL
> and
> passes it on un-encoded - have you tried encoding the path again? That is,
> encoding the percent-signs:
>
> http://
> {ip}:20888/proxy/application_1504649135200_0001/jobs/
> 1a0fd176ec8aabb9b8464fa481f755f0/cancel-with-savepoint/target-directory/
> s3%253A%252F%252F%252Fremit-flink
>
>
> Nico
>
> On Wednesday, 20 September 2017 00:52:05 CEST Emily McMahon wrote:
> > Thanks Eron & Fabian.
> >
> > The issue was hitting a yarn proxy url vs the node itself. For example
> this
> > worked
> > http://
> > {ip}:37716/jobs/1a0fd176ec8aabb9b8464fa481f755
> f0/cancel-with-savepoint/targe
> > t-directory/s3%3A%2F%2F%2Fremit-flink
> >
> > But this did not
> > http://
> > {ip}:20888/proxy/application_1504649135200_0001/jobs/
> 1a0fd176ec8aabb9b8464fa
> > 481f755f0/cancel-with-savepoint/target-directory/s3%
> 3A%2F%2F%2Fremit-flink
> >
> > It's a bit confusing because the cancel api works with either and the
> proxy
> > url sometimes works as this was successful http://
> > {ip}:20888/proxy/application_1504649135200_0001/jobs/
> cca2dd609c716a7b0a19570
> > 0777e5b1f/cancel-with-savepoint/target-directory/tmp/
> >
> > Cheers,
> > Emily
> >
> > On Tue, Sep 19, 2017 at 2:37 PM, Eron Wright 
> wrote:
> > > Good news, it can be done if you carefully encode the target directory
> > > with percent-encoding, as per:
> > > https://tools.ietf.org/html/rfc3986#section-2.1
> > >
> > > For example, given the directory `s3:///savepoint-bucket/my-
> awesome-job`,
> > > which encodes to `s3%3A%2F%2F%2Fsavepoint-bucket%2Fmy-awesome-job`, I
> was
> > > able to submit the following URL:
> > > http://localhost:8081/jobs/5c360ded6e4b7d8db103e71d68b7c8
> > > 3d/cancel-with-savepoint/target-directory/s3%3A%2F%2F%
> > > 2Fsavepoint-bucket%2Fmy-awesome-job
> > >
> > > And see the following in the log:
> > > 2017-09-19 14:27:45,939 INFO
> > > org.apache.flink.runtime.jobmanager.JobManager>
> > >- Trying to cancel job 5c360ded6e4b7d8db103e71d68b7c83d
> > >
> > > with savepoint to s3:///savepoint-bucket/my-awesome-job
> > >
> > > -Eron
> > >
> > > On Tue, Sep 19, 2017 at 1:54 PM, Fabian Hueske 
> wrote:
> > >> Hi Emily,
> > >>
> > >> thanks for reaching out.
> > >> I'm not familiar with the details of the Rest API but Ufuk (in CC)
> might
> > >> be able to help you.
> > >>
> > >> Best, Fabian
> > >>
> > >> 2017-09-19 10:23 GMT+02:00 Emily McMahon :
> > >>> I've tried every combination I can think of to pass an s3 path as the
> > >>> target directory (url encode, include trailing slash, etc)
> > >>>
> > >>> I can successfully pass a local path as the target directory (ie
> > >>> /jobs/$jobID/cancel-with-savepoint/target-directory/tmp) so I don't
> > >>> think there's a problem with the jobId or rest of the url. I also
> > >>> verified
> > >>> I can create the savepoint on s3 from the command line so it's not a
> > >>> permission issue.
> > >>>
> > >>> Here's the same question on stack overflow
> > >>>  job-manager-a-f
> > >>> ully-qualified-path-to-a-savepoint-directory-using-the> (with the
> > >>> exception that they are getting a 502 whereas I'm getting a 404)
> > >>>
> > >>> using Flink 1.3.1
> > >>>
> > >>> Anyone have a working example?
> > >>>
> > >>> Thanks,
> > >>> Emily
>
>


Re: Classpath/ClassLoader issues

2017-09-20 Thread Fabian Hueske
Here's the pull request that hopefully fixes your issue:
https://github.com/apache/flink/pull/4690

Best, Fabian

2017-09-20 16:15 GMT+02:00 Fabian Hueske :

> Hi Garrett,
>
> I think I identified the problem.
> You said you put the Hive/HCat dependencies into your user fat Jar,
> correct? In this case, they are loaded with Flink's userClassLoader (as
> described before).
>
> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
> loads the user classes with the user class loader.
> However, when the HCatOutputFormat.getOutputCommitter() method is called,
> Hive tries to load additional classes with the current thread class loader
> (see at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.
> java:78)).
> This behavior is actually OK, because we usually set the context
> classloader to be the user classloader before calling user code. However,
> this has not been done here.
> So, this is in fact a bug.
>
> I created this JIRA issue: https://issues.apache.org/
> jira/browse/FLINK-7656 and will open a PR for that.
>
> Thanks for helping to diagnose the issue,
> Fabian
>
> 2017-09-19 22:05 GMT+02:00 Garrett Barton :
>
>> Fabian,
>>
>>  It looks like hive instantiates both input and output formats when doing
>> either. I use hive 1.2.1, and you can see in HCatUtil.getStorageHandler
>> where it tries to load both.  It looks like its happening after the writes
>> complete and flink is in the finish/finalize stage.  When I watch the
>> counters in the Flink ui, i see all output tasks mark finished along with
>> bytes sent and records sent being exactly what I expect them to be.  The
>> first error also mentions the master, is this the flink jobmanager process
>> then?
>>
>> The expanded stacktrace is:
>>
>> Caused by: java.lang.Exception: Failed to finalize execution on master
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>> xFinished(ExecutionGraph.java:1325)
>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>> utionFinished(ExecutionVertex.java:688)
>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>> ed(Execution.java:797)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>> eState(ExecutionGraph.java:1477)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>> ... 8 more
>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
>> load foster storage handler
>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>> eOnMaster(OutputFormatVertex.java:118)
>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>> xFinished(ExecutionGraph.java:1320)
>> ... 14 more
>> Caused by: java.io.IOException: Failed to load foster storage handler
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
>> HCatUtil.java:409)
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(
>> HCatUtil.java:367)
>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
>> utputFormat(HCatBaseOutputFormat.java:77)
>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
>> tCommitter(HCatOutputFormat.java:275)
>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
>> ... 16 more
>> Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hive.ql.io
>> .orc.OrcInputFormat
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.> t>(FosterStorageHandler.68)
>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>> CatUtil.java:404)
>>
>>
>> Thank you all for any help. :)
>>
>> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske 
>> wrote:
>>
>>> Hi Garrett,
>>>
>>> Flink distinguishes between two classloaders: 1) the system classloader
>>> which is the main classloader of the process. This classloader loads all
>>> jars in the ./lib folder and 2) the user classloader which loads the job
>>> jar.
>>> AFAIK, the different operators do not have distinct classloaders. So, in
>>> principle all operators should use the same user classloa

RE: the design of spilling to disk

2017-09-20 Thread Newport, Billy
Don’t forget there is also spilling/serialization in between stages in the 
pipeline if operations cannot be chained.


From: Kurt Young [mailto:ykt...@gmail.com]
Sent: Tuesday, September 19, 2017 9:09 PM
To: Florin Dinu
Cc: Kostas Kloudas; user@flink.apache.org; fhue...@apache.org
Subject: Re: the design of spilling to disk

Copied from my earlier response to some similar question:

"Here is a short description for how it works: there are totally 3 threads 
working together, one for reading, one for sorting partial data in memory, and 
the last one is responsible for spilling. Flink will first figure out how many 
memory it can use during the in-memory sort, and manage them as MemorySegments. 
Once these memory runs out, the sorting thread will take over these memory and 
do the in-memory sorting (For more details about in-memory sorting, you can see 
NormalizedKeySorter). After this, the spilling thread will write this sorted 
data to disk and make these memory available again for reading. This will 
repeated until all data has been processed.
Normally, the data will be read twice (one from source, and one from disk) and 
write once, but if you spilled too much files, flink will first merge some all 
the files and make sure the last merge step will not exceed some limit (default 
128). Hope this can help you."

Best,
Kurt

On Wed, Sep 20, 2017 at 12:19 AM, Florin Dinu 
mailto:florin.d...@epfl.ch>> wrote:

Hi Kostas,



Thank you for the quick reply and the tips. I will check them out !



I would like to start by understanding the way secondary storage is used in 
batch processing.

If you guys have additional pointers on that, it would certainly help me a lot.



Thanks again,

Florin




From: Kostas Kloudas 
mailto:k.klou...@data-artisans.com>>
Sent: Tuesday, September 19, 2017 18:10
To: Florin Dinu
Cc: user@flink.apache.org; 
fhue...@apache.org
Subject: Re: the design of spilling to disk

Hi Florin,

Unfortunately, there is no design document.

The UnilateralSortMerger.java is used in the batch processing mode (not is 
streaming) and,
in fact, the code dates some years back. I cc also Fabian as he may have more 
things to say on this.

Now for the streaming side, Flink uses 3 state-backends, in-memory (no spilling 
and mainly useful for testing),
filesystem and RocksDB (both eventually spill to disk but in different ways), 
and it also supports incremental
checkpoints, i.e. at each checkpoint it only stores the diff between 
checkpoint[i] and checkpoint[i-1].

For more information on Flink state and state backends, checkout the latest 
talk from Stefan Richter at
Flink Forward Berlin 2017 
(https://www.youtube.com/watch?v=dWQ24wERItM)
 and the .

Cheers,
Kostas

On Sep 19, 2017, at 6:00 PM, Florin Dinu 
mailto:florin.d...@epfl.ch>> wrote:

Hello everyone,

In our group at EPFL we're doing research on understanding and potentially 
improving the performance of data-parallel frameworks that use secondary 
storage.
I was looking at the Flink code to understand how spilling to disk actually 
works.
So far I got to the UnilateralSortMerger.java and its spill and reading 
threads. I also saw there are some spilling markers used.
I am curious if there is any design document available on this topic.
I was not able to find much online.
If there is no such design document I would appreciate if someone could help me 
understand how these spilling markers are used.
At a higher level, I am trying to understand how much data does Flink spill to 
disk after it has concluded that it needs to spill to disk.

Thank you very much
Florin Dinu




Re: Classpath/ClassLoader issues

2017-09-20 Thread Garrett Barton
Fabian,
 Awesome!  After your initial email I got things to work by deploying my
fat jar into the flink/lib folder, and volia! it worked. :)  I will grab
your pull request and give it a go tomorrow.

On Wed, Sep 20, 2017 at 1:03 PM, Fabian Hueske  wrote:

> Here's the pull request that hopefully fixes your issue:
> https://github.com/apache/flink/pull/4690
>
> Best, Fabian
>
> 2017-09-20 16:15 GMT+02:00 Fabian Hueske :
>
>> Hi Garrett,
>>
>> I think I identified the problem.
>> You said you put the Hive/HCat dependencies into your user fat Jar,
>> correct? In this case, they are loaded with Flink's userClassLoader (as
>> described before).
>>
>> In the OutputFormatVertex.finalizeOnMaster() method, Flink correctly
>> loads the user classes with the user class loader.
>> However, when the HCatOutputFormat.getOutputCommitter() method is
>> called, Hive tries to load additional classes with the current thread class
>> loader (see at org.apache.hadoop.hive.common.
>> JavaUtils.loadClass(JavaUtils.java:78)).
>> This behavior is actually OK, because we usually set the context
>> classloader to be the user classloader before calling user code. However,
>> this has not been done here.
>> So, this is in fact a bug.
>>
>> I created this JIRA issue: https://issues.apache.org/jira
>> /browse/FLINK-7656 and will open a PR for that.
>>
>> Thanks for helping to diagnose the issue,
>> Fabian
>>
>> 2017-09-19 22:05 GMT+02:00 Garrett Barton :
>>
>>> Fabian,
>>>
>>>  It looks like hive instantiates both input and output formats when
>>> doing either. I use hive 1.2.1, and you can see in
>>> HCatUtil.getStorageHandler where it tries to load both.  It looks like its
>>> happening after the writes complete and flink is in the finish/finalize
>>> stage.  When I watch the counters in the Flink ui, i see all output tasks
>>> mark finished along with bytes sent and records sent being exactly what I
>>> expect them to be.  The first error also mentions the master, is this the
>>> flink jobmanager process then?
>>>
>>> The expanded stacktrace is:
>>>
>>> Caused by: java.lang.Exception: Failed to finalize execution on master
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>> xFinished(ExecutionGraph.java:1325)
>>> at org.apache.flink.runtime.executiongraph.ExecutionVertex.exec
>>> utionFinished(ExecutionVertex.java:688)
>>> at org.apache.flink.runtime.executiongraph.Execution.markFinish
>>> ed(Execution.java:797)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.updat
>>> eState(ExecutionGraph.java:1477)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply$mcV$sp(JobManager.scala:710)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$hand
>>> leMessage$1$$anonfun$applyOrElse$4.apply(JobManager.scala:709)
>>> ... 8 more
>>> Caused by: java.lang.RuntimeException: java.io.IOException: Failed to
>>> load foster storage handler
>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:202)
>>> at org.apache.flink.runtime.jobgraph.OutputFormatVertex.finaliz
>>> eOnMaster(OutputFormatVertex.java:118)
>>> at org.apache.flink.runtime.executiongraph.ExecutionGraph.verte
>>> xFinished(ExecutionGraph.java:1320)
>>> ... 14 more
>>> Caused by: java.io.IOException: Failed to load foster storage handler
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:409)
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:367)
>>> at org.apache.hive.hcatalog.mapreduce.HCatBaseOutputFormat.getO
>>> utputFormat(HCatBaseOutputFormat.java:77)
>>> at org.apache.hive.hcatalog.mapreduce.HCatOutputFormat.getOutpu
>>> tCommitter(HCatOutputFormat.java:275)
>>> at org.apache.flink.api.java.hadoop.mapreduce.HadoopOutputForma
>>> tBase.finalizeGlobal(HadoopOutputFormatBase.java:200)
>>> ... 16 more
>>> Caused by: java.lang.ClassNotFoundException:
>>> org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:348)
>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.java:78)
>>> at org.apache.hadoop.hive.common.JavaUtils.loadClass(JavaUtils.74)
>>> at org.apache.hive.hcatalog.mapreduce.FosterStorageHandler.>> t>(FosterStorageHandler.68)
>>> at org.apache.hive.hcatalog.common.HCatUtil.getStorageHandler(H
>>> CatUtil.java:404)
>>>
>>>
>>> Thank you all for any help. :)
>>>
>>> On Tue, Sep 19, 2017 at 11:05 AM, Fabian Hueske 
>>> wrote:
>>>
 Hi Garrett,

 Flink d

Re: [DISCUSS] Dropping Scala 2.10

2017-09-20 Thread Aljoscha Krettek
Hi Federico,

As far as I know, the Kafka client code has been rewritten in Java for version 
0.9, meaning there is no more Scala dependency in there. Only the server 
(broker) code still contains Scala but it doesn't matter what Scala version a 
client uses, if any.

Best,
Aljoscha 
> On 20. Sep 2017, at 14:32, Federico D'Ambrosio  wrote:
> 
> Hi, as far as I know some vendors like Hortonworks still use Kafka_2.10 as 
> part of their hadoop distribution. 
> Could the use of a different scala version cause issues with the Kafka 
> connector? I'm asking because we are using HDP 2.6 and we once already had 
> some issue with conflicting scala versions concerning Kafka (though, we were 
> using Storm, I still haven't tested the Flink connector in this context).
> 
> Regards,
> Federico
> 
> 2017-09-20 14:19 GMT+02:00 Ted Yu  >:
> +1
> 
>  Original message 
> From: Hai Zhou mailto:yew...@gmail.com>>
> Date: 9/20/17 12:44 AM (GMT-08:00)
> To: Aljoscha Krettek mailto:aljos...@apache.org>>, 
> d...@flink.apache.org , user 
> mailto:user@flink.apache.org>>
> Subject: Re: [DISCUSS] Dropping Scala 2.10
> 
> +1
> 
> > 在 2017年9月19日,17:56,Aljoscha Krettek  > > 写道:
> > 
> > Hi,
> > 
> > Talking to some people I get the impression that Scala 2.10 is quite 
> > outdated by now. I would like to drop support for Scala 2.10 and my main 
> > motivation is that this would allow us to drop our custom Flakka build of 
> > Akka that we use because newer Akka versions only support Scala 2.11/2.12 
> > and we need a backported feature.
> > 
> > Are there any concerns about this?
> > 
> > Best,
> > Aljoscha
> 
> 
> 
> 
> -- 
> Federico D'Ambrosio



LocalStreamEnvironment - configuration doesn't seem to be used in RichFunction operators

2017-09-20 Thread Michael Kobit
I'm new to Flink and in the process of trying to write a few operators and
tests for them. One of the issues I've ran into is "how do I properly set
up the dependencies for an operator". I've discovered the serialization
constraints and learned about the execution some model as I've started to
progress through it, but I'm still struggling to find an analog for
dependency injection in Flink.

I was experimenting with different ways to supply configuration for the
*Rich* functions to basically set themselves up and tear themselves down
with their dependencies on open/close. I wanted to basically "inject" a
dependency say like an HTTP client that caches, and then mock that
dependency for a local test instead of actually making HTTP calls. It
seemed like it could be done by getting and getting the correct
implementation types from the config using some custom injector type
(analogous to Spring or Guice dependency injection). I know I have to deal
serialization of the operators, which is why I was thinking I could do this
in open/close and have the magical injector be serializable (and possibly
be part of the config). This may or may not be a bad idea already, but bear
with me (and any feedback is very appreciated).

I was doing some local testing using StreamExecutionEnvironment, but wasn't
able to actually pass in configuration options to the local stream
execution.

I tried it these ways:

   1. Create with a config
   - StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
   2. Configure the created LocalStreamEnvironment
   by env.getConfig().setGlobalJobParameters(configuration)
   3. Configure thte DataStremSource
   by source.getExecutionConfig().setGlobalJobParameters(configuration)
   4. Configure the SingleOutputStreamOperator
   by mapped.getExecutionConfig().setGlobalJobParameters(configuration)

All 4 of those failed, so I felt like I am doing something wrong here, and
wanted to reach out.

Here is the example code where all of those tests failing:

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Test;

import java.util.Iterator;

public class FlinkInspection {

@Test
public void
issueWithLocalStreamEnvironmentCreateWithConfiguration() throws
Exception {
Configuration configuration = new Configuration();
configuration.setInteger("key", 10);
LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1, configuration);
DataStreamSource source = env.fromElements(1, 2);

SingleOutputStreamOperator mapped = source.map(new
ConfigurationRetrievingOperator());

Iterator collection = DataStreamUtils.collect(mapped);
env.execute();

assertThat(collection).containsExactlyInAnyOrder(10, 20);
}

@Test
public void
issueWithLocalStreamEnvironmentConfiguredWithWithConfiguration()
throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger("key", 10);
LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
env.getConfig().setGlobalJobParameters(configuration);
DataStreamSource source = env.fromElements(1, 2);

SingleOutputStreamOperator mapped = source.map(new
ConfigurationRetrievingOperator());

Iterator collection = DataStreamUtils.collect(mapped);
env.execute();

assertThat(collection).containsExactlyInAnyOrder(10, 20);
}

@Test
public void
issueWithLocalStreamEnvironmentConfiguringDataStreamSource() throws
Exception {
LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
DataStreamSource source = env.fromElements(1, 2);
Configuration configuration = new Configuration();
configuration.setInteger("key", 10);
source.getExecutionConfig().setGlobalJobParameters(configuration);

SingleOutputStreamOperator mapped = source.map(new
ConfigurationRetrievingOperator());

Iterator collection = DataStreamUtils.collect(mapped);
env.execute();

assertThat(collection).containsExactlyInAnyOrder(10, 20);
}

@Test
public void
issueWithLocalStreamEnvironmentConfiguringDataStreamWithOperator()
throws Exception {
LocalStreamEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(1);
DataStreamSource source = env.fromElements(1, 2);
Configuration configuration = new Configuration();
configuration.setInteger("key", 10);

Si

Recommended way to schedule Flink jobs periodically on EMR

2017-09-20 Thread ShB
Are there any recommended ways to schedule and manage Flink Workflows on EMR?
I need to run a series of jobs daily, based on API requests coming in, and
this job will need to interface with S3 for data. 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/