Hi Aljoscha, Thanks! I will look into this.
Best, Aaron Levin On Fri, Nov 9, 2018 at 5:01 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > Hi, > > I think for this case a model that is similar to how the Streaming File > Source works should be good. You can have a look at > ContinuousFileMonitoringFunction and ContinuousFileReaderOperator. The > idea is that the first emits splits that should be processed and the second > is responsible for reading those splits. A generic version of that is what > I'm proposing for the refactoring of our source interface [1] that also > comes with a prototype implementation [2]. > > I think something like this should be adaptable to your case. The split > enumerator would at first only emit file splits downstream, after that it > would emit Kafka partitions that should be read. The split reader would > understand both file splits and kafka partitions and can read from both. > This still has some kinks to be worked out when it comes to watermarks, > FLIP-27 is not finished. > > What do you think? > > Best, > Aljoscha > > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP- > 27%3A+Refactor+Source+Interface > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface> > [2] https://github.com/aljoscha/flink/commits/refactor-source-interface > > > On 1. Nov 2018, at 16:50, Aaron Levin <aaronle...@stripe.com> wrote: > > Hey, > > Thanks for reaching out! I'd love to take a step back and find a better > solution, so I'll try to be succint in what I'm trying to accomplish: > > We're trying to write a SourceFunction which: > * reads some Sequence files from S3 in a particular order (each task gets > files in a specific order). > * sends a watermark between each sequence file > * when that's complete, starts reading from Kafka topics. > * (This is similar to the bootstrap problem which Lyft has talked about > (see: https://www.slideshare.net/FlinkForward/flink-forward- > san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink)) > > The current solution I have involves a custom InputFormat, InputSplit, and > SplitAssignor. It achieves most of these requirements, except I have to > extend InputFormatSourceFunction. I have a class that looks like: > > class MySourceFunction(val s3Archives: CustomInputFormat, val kafka: > KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...} > > There are lots I don't like about the existing solution: > * I have to extend InputFormatSourceFunction to ensure the graph is > initialized properly (the bug I wrote about) > * I had to replicate most of the implementation of > InputFormatSourceFunction so I could insert Watermarks between splits. > > I'd love any suggestions around improving this! > > Best, > > Aaron Levin > > On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > >> Hi Aaron, >> >> I'l like to take a step back and understand why you're trying to wrap an >> InputFormatSourceFunction? >> >> In my opinion, InputFormatSourceFunction should not be used because it >> has some shortcomings, the most prominent among them that it does not >> support checkpointing, i.e. in case of failure all data will (probably) be >> read again. I'm saying probably because the interaction of >> InputFormatSourceFunction with how InputSplits are generated (which relates >> to that code snippet with the cast you found) could be somewhat "spooky" >> and lead to weird results in some cases. >> >> The interface is a remnant of a very early version of the streaming API >> and should probably be removed soon. I hope we can find a better solution >> for your problem that fits better with Flink. >> >> Best, >> Aljoscha >> >> On 1. Nov 2018, at 15:30, Aaron Levin <aaronle...@stripe.com> wrote: >> >> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can >> provide any insight or advice, that would be helpful! >> >> Thanks again. >> >> Best, >> >> Aaron Levin >> >> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronle...@stripe.com> >> wrote: >> >>> Hey, >>> >>> Not sure how convo threading works on this list, so in case the folks >>> CC'd missed my other response, here's some more info: >>> >>> First, I appreciate everyone's help! Thank you! >>> >>> I wrote several wrappers to try and debug this, including one which is >>> an exact copy of `InputFormatSourceFunction` which also failed. They all >>> failed with the same error I detail above. I'll post two of them below. >>> They all extended `RichParallelSourceFunction` and, as far as I could tell, >>> were properly initialized (though I may have missed something!). >>> Additionally, for the two below, if I change `extends >>> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`, >>> I no longer receive the exception. This is what led me to believe the >>> source of the issue was casting and how I found the line of code where the >>> stream graph is given the input format. >>> >>> Quick explanation of the wrappers: >>> 1. `WrappedInputFormat` does a basic wrap around >>> `InputFormatSourceFunction` and delegates all methods to the underlying >>> `InputFormatSourceFunction` >>> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the >>> `InputFormatSourceFunction` source. >>> 3. They're being used in a test which looks vaguely like: >>> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new >>> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str >>> ing]]))).javaStream).asScala.toSeq` >>> >>> class WrappedInputFormat[A]( >>> inputFormat: InputFormatSourceFunction[A] >>> )( >>> implicit typeInfo: TypeInformation[A] >>> ) extends RichParallelSourceFunction[A] { >>> >>> override def run(sourceContext: SourceFunction.SourceContext[A]): >>> Unit = { >>> inputFormat.run(sourceContext) >>> } >>> override def setRuntimeContext(t: RuntimeContext): Unit = { >>> inputFormat.setRuntimeContext(t) >>> } >>> override def equals(obj: scala.Any) = { >>> inputFormat.equals(obj) >>> } >>> override def hashCode() = { inputFormat.hashCode() } >>> override def toString = { inputFormat.toString } >>> override def getRuntimeContext(): RuntimeContext = { >>> inputFormat.getRuntimeContext } >>> override def getIterationRuntimeContext = { >>> inputFormat.getIterationRuntimeContext } >>> override def open(parameters: Configuration): Unit = { >>> inputFormat.open(parameters) >>> } >>> override def cancel(): Unit = { >>> inputFormat.cancel() >>> } >>> override def close(): Unit = { >>> inputFormat.close() >>> } >>> } >>> >>> And the other one: >>> >>> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A, >>> InputSplit], val typeInfo: TypeInformation[A]) extends >>> RichParallelSourceFunction[A] { >>> >>> @transient private var provider: InputSplitProvider = _ >>> @transient private var serializer: TypeSerializer[A] = _ >>> @transient private var splitIterator: Iterator[InputSplit] = _ >>> private var isRunning: Boolean = _ >>> >>> override def open(parameters: Configuration): Unit = { >>> val context = getRuntimeContext.asInstanceOf >>> [StreamingRuntimeContext] >>> if(format.isInstanceOf[RichInputFormat[_,_]]) { >>> format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext( >>> context) >>> } >>> format.configure(parameters) >>> >>> provider = context.getInputSplitProvider >>> serializer = typeInfo.createSerializer(getR >>> untimeContext.getExecutionConfig) >>> splitIterator = getInputSplits() >>> isRunning = splitIterator.hasNext >>> } >>> >>> override def run(sourceContext: SourceFunction.SourceContext[A]): >>> Unit = { >>> if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) { >>> format.asInstanceOf[RichInputFormat[_,_]].openInputFormat() >>> } >>> >>> var nextElement: A = serializer.createInstance() >>> try { >>> while (isRunning) { >>> format.open(splitIterator.next()) >>> while (isRunning && !format.reachedEnd()) { >>> nextElement = format.nextRecord(nextElement) >>> if (nextElement != null) { >>> sourceContext.collect(nextElement) >>> } else { >>> break >>> } >>> format.close() >>> if (isRunning) { >>> isRunning = splitIterator.hasNext >>> } >>> } >>> } >>> } finally { >>> >>> format.close() >>> if (format.isInstanceOf[RichInputFormat[_,_]]) { >>> format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() >>> } >>> isRunning = false >>> } >>> } >>> >>> override def cancel(): Unit = { >>> isRunning = false >>> } >>> >>> override def close(): Unit = { >>> format.close() >>> if(format.isInstanceOf[RichInputFormat[_,_]]) { >>> format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat() >>> } >>> } >>> >>> private def getInputSplits(): Iterator[InputSplit] = { >>> new Iterator[InputSplit] { >>> private var nextSplit: InputSplit = _ >>> private var exhausted: Boolean = _ >>> >>> override def hasNext: Boolean = { >>> if(exhausted) { return false } >>> if(nextSplit != null) { return true } >>> var split: InputSplit = null >>> >>> try { >>> split = provider.getNextInputSplit(get >>> RuntimeContext.getUserCodeClassLoader) >>> } catch { >>> case e: InputSplitProviderException => >>> throw new RuntimeException("No InputSplit Provider", e) >>> } >>> >>> if(split != null) { >>> nextSplit = split >>> true >>> } else { >>> exhausted = true >>> false >>> } >>> } >>> >>> override def next(): InputSplit = { >>> if(nextSplit == null && !hasNext) { >>> throw new NoSuchElementException() >>> } >>> val tmp: InputSplit = nextSplit >>> nextSplit = null >>> tmp >>> } >>> >>> } >>> } >>> } >>> >>> >>> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz < >>> dwysakow...@apache.org> wrote: >>> >>>> Hi Aaron, >>>> >>>> Could you share the code of you custom function? >>>> >>>> I am also adding Aljosha and Kostas to cc, who should be more helpful >>>> on that topic. >>>> >>>> Best, >>>> >>>> Dawid >>>> On 19/10/2018 20:06, Aaron Levin wrote: >>>> >>>> Hi, >>>> >>>> I'm writing a custom `SourceFunction` which wraps an underlying >>>> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a >>>> stream (via `env.addSource` and a subsequent sink) I get errors related to >>>> the `InputSplitAssigner` not being initialized for a particular vertex ID. >>>> Full error here[1]. >>>> >>>> I believe the underlying error is related to this[0] call to >>>> `instanceof InputFormatSourceFunction`. >>>> >>>> *My questions*: >>>> >>>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this >>>> error? Am I missing a chunk of the API covering this? >>>> 2. is the error I'm experience related to that casting call? If so, >>>> would ya'll be open to a PR which adds an interface one can extend which >>>> will set the input format in the stream graph? Or is there a preferred way >>>> of achieving this? >>>> >>>> Thanks! >>>> >>>> Aaron Levin >>>> >>>> [0] https://github.com/apache/flink/blob/release-1.6/flink-s >>>> treaming-java/src/main/java/org/apache/flink/streaming/api/g >>>> raph/StreamGraphGenerator.java#L480 >>>> [1] >>>> java.lang.RuntimeException: Could not retrieve next input split. >>>> at org.apache.flink.streaming.api.functions.source.InputFormatS >>>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:157) >>>> at org.apache.flink.streaming.api.functions.source.InputFormatS >>>> ourceFunction.open(InputFormatSourceFunction.java:71) >>>> at REDACTED >>>> at org.apache.flink.api.common.functions.util.FunctionUtils.ope >>>> nFunction(FunctionUtils.java:36) >>>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp >>>> erator.open(AbstractUdfStreamOperator.java:102) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO >>>> perators(StreamTask.java:424) >>>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S >>>> treamTask.java:290) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) >>>> at java.lang.Thread.run(Thread.java:748) >>>> Caused by: >>>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException: >>>> Requesting the next input split failed. >>>> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi >>>> der.getNextInputSplit(RpcInputSplitProvider.java:69) >>>> at org.apache.flink.streaming.api.functions.source.InputFormatS >>>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:155) >>>> ... 8 more >>>> Caused by: java.util.concurrent.ExecutionException: >>>> java.lang.Exception: No InputSplitAssigner for vertex ID >>>> cbc357ccb763df2852fee8c4fc7d55f2 >>>> at java.util.concurrent.CompletableFuture.reportGet(Completable >>>> Future.java:357) >>>> at java.util.concurrent.CompletableFuture.get(CompletableFuture >>>> .java:1915) >>>> at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi >>>> der.getNextInputSplit(RpcInputSplitProvider.java:61) >>>> ... 9 more >>>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID >>>> cbc357ccb763df2852fee8c4fc7d55f2 >>>> at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInpu >>>> tSplit(JobMaster.java:575) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce >>>> ssorImpl.java:62) >>>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe >>>> thodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo >>>> cation(AkkaRpcActor.java:247) >>>> ... >>>> >>>> >>> >> >> > >