Hi Aljoscha,

Thanks! I will look into this.

Best,

Aaron Levin

On Fri, Nov 9, 2018 at 5:01 AM, Aljoscha Krettek <aljos...@apache.org>
wrote:

> Hi,
>
> I think for this case a model that is similar to how the Streaming File
> Source works should be good. You can have a look at
> ContinuousFileMonitoringFunction and ContinuousFileReaderOperator. The
> idea is that the first emits splits that should be processed and the second
> is responsible for reading those splits. A generic version of that is what
> I'm proposing for the refactoring of our source interface [1] that also
> comes with a prototype implementation [2].
>
> I think something like this should be adaptable to your case. The split
> enumerator would at first only emit file splits downstream, after that it
> would emit Kafka partitions that should be read. The split reader would
> understand both file splits and kafka partitions and can read from both.
> This still has some kinks to be worked out when it comes to watermarks,
> FLIP-27 is not finished.
>
> What do you think?
>
> Best,
> Aljoscha
>
> [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 27%3A+Refactor+Source+Interface
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>
> [2] https://github.com/aljoscha/flink/commits/refactor-source-interface
>
>
> On 1. Nov 2018, at 16:50, Aaron Levin <aaronle...@stripe.com> wrote:
>
> Hey,
>
> Thanks for reaching out! I'd love to take a step back and find a better
> solution, so I'll try to be succint in what I'm trying to accomplish:
>
> We're trying to write a SourceFunction which:
> * reads some Sequence files from S3 in a particular order (each task gets
> files in a specific order).
> * sends a watermark between each sequence file
> * when that's complete, starts reading from Kafka topics.
> * (This is similar to the bootstrap problem which Lyft has talked about
> (see: https://www.slideshare.net/FlinkForward/flink-forward-
> san-francisco-2018-gregory-fee-bootstrapping-state-in-apache-flink))
>
> The current solution I have involves a custom InputFormat, InputSplit, and
> SplitAssignor. It achieves most of these requirements, except I have to
> extend InputFormatSourceFunction. I have a class that looks like:
>
> class MySourceFunction(val s3Archives: CustomInputFormat, val kafka:
> KafkaBase) extends InputFormatSourceFunction(s3Archives, typestuff) {...}
>
> There are lots I don't like about the existing solution:
> * I have to extend InputFormatSourceFunction to ensure the graph is
> initialized properly (the bug I wrote about)
> * I had to replicate most of the implementation of
> InputFormatSourceFunction so I could insert Watermarks between splits.
>
> I'd love any suggestions around improving this!
>
> Best,
>
> Aaron Levin
>
> On Thu, Nov 1, 2018 at 10:41 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
>> Hi Aaron,
>>
>> I'l like to take a step back and understand why you're trying to wrap an
>> InputFormatSourceFunction?
>>
>> In my opinion, InputFormatSourceFunction should not be used because it
>> has some shortcomings, the most prominent among them that it does not
>> support checkpointing, i.e. in case of failure all data will (probably) be
>> read again. I'm saying probably because the interaction of
>> InputFormatSourceFunction with how InputSplits are generated (which relates
>> to that code snippet with the cast you found) could be somewhat "spooky"
>> and lead to weird results in some cases.
>>
>> The interface is a remnant of a very early version of the streaming API
>> and should probably be removed soon. I hope we can find a better solution
>> for your problem that fits better with Flink.
>>
>> Best,
>> Aljoscha
>>
>> On 1. Nov 2018, at 15:30, Aaron Levin <aaronle...@stripe.com> wrote:
>>
>> Hey Friends! Last ping and I'll move this over to a ticket. If anyone can
>> provide any insight or advice, that would be helpful!
>>
>> Thanks again.
>>
>> Best,
>>
>> Aaron Levin
>>
>> On Fri, Oct 26, 2018 at 9:55 AM, Aaron Levin <aaronle...@stripe.com>
>> wrote:
>>
>>> Hey,
>>>
>>> Not sure how convo threading works on this list, so in case the folks
>>> CC'd missed my other response, here's some more info:
>>>
>>> First, I appreciate everyone's help! Thank you!
>>>
>>> I wrote several wrappers to try and debug this, including one which is
>>> an exact copy of `InputFormatSourceFunction` which also failed. They all
>>> failed with the same error I detail above. I'll post two of them below.
>>> They all extended `RichParallelSourceFunction` and, as far as I could tell,
>>> were properly initialized (though I may have missed something!).
>>> Additionally, for the two below, if I change `extends
>>> RichParallelSourceFunction` to `extends InputFormatSourceFunction(...,...)`,
>>> I no longer receive the exception. This is what led me to believe the
>>> source of the issue was casting and how I found the line of code where the
>>> stream graph is given the input format.
>>>
>>> Quick explanation of the wrappers:
>>> 1. `WrappedInputFormat` does a basic wrap around
>>> `InputFormatSourceFunction` and delegates all methods to the underlying
>>> `InputFormatSourceFunction`
>>> 2. `ClonedInputFormatSourceFunction` is a ~exact copy of the
>>> `InputFormatSourceFunction` source.
>>> 3. They're being used in a test which looks vaguely like:
>>> `DataStreamUtils.collect(env.addSource(new WrappedInputFormat(new
>>> InputFormatSourceFunction[String](source, implicitly[TypeInformation[Str
>>> ing]]))).javaStream).asScala.toSeq`
>>>
>>> class WrappedInputFormat[A](
>>>   inputFormat: InputFormatSourceFunction[A]
>>> )(
>>>   implicit typeInfo: TypeInformation[A]
>>> ) extends RichParallelSourceFunction[A] {
>>>
>>>   override def run(sourceContext: SourceFunction.SourceContext[A]):
>>> Unit = {
>>>     inputFormat.run(sourceContext)
>>>   }
>>>   override def setRuntimeContext(t: RuntimeContext): Unit = {
>>>     inputFormat.setRuntimeContext(t)
>>>   }
>>>   override def equals(obj: scala.Any) = {
>>>     inputFormat.equals(obj)
>>>   }
>>>   override def hashCode() = { inputFormat.hashCode() }
>>>   override def toString = { inputFormat.toString }
>>>   override def getRuntimeContext(): RuntimeContext = {
>>> inputFormat.getRuntimeContext }
>>>   override def getIterationRuntimeContext = {
>>> inputFormat.getIterationRuntimeContext }
>>>   override def open(parameters: Configuration): Unit = {
>>>     inputFormat.open(parameters)
>>>   }
>>>   override def cancel(): Unit = {
>>>     inputFormat.cancel()
>>>   }
>>>   override def close(): Unit = {
>>>     inputFormat.close()
>>>   }
>>> }
>>>
>>> And the other one:
>>>
>>> class ClonedInputFormatSourceFunction[A](val format: InputFormat[A,
>>> InputSplit], val typeInfo: TypeInformation[A]) extends
>>> RichParallelSourceFunction[A] {
>>>
>>>   @transient private var provider: InputSplitProvider = _
>>>   @transient private var serializer: TypeSerializer[A] = _
>>>   @transient private var splitIterator: Iterator[InputSplit] = _
>>>   private var isRunning: Boolean = _
>>>
>>>   override def open(parameters: Configuration): Unit = {
>>>     val context = getRuntimeContext.asInstanceOf
>>> [StreamingRuntimeContext]
>>>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>>>       format.asInstanceOf[RichInputFormat[_,_]].setRuntimeContext(
>>> context)
>>>     }
>>>     format.configure(parameters)
>>>
>>>     provider = context.getInputSplitProvider
>>>     serializer = typeInfo.createSerializer(getR
>>> untimeContext.getExecutionConfig)
>>>     splitIterator = getInputSplits()
>>>     isRunning = splitIterator.hasNext
>>>   }
>>>
>>>   override def run(sourceContext: SourceFunction.SourceContext[A]):
>>> Unit = {
>>>     if(isRunning && format.isInstanceOf[RichInputFormat[_,_]]) {
>>>       format.asInstanceOf[RichInputFormat[_,_]].openInputFormat()
>>>     }
>>>
>>>     var nextElement: A = serializer.createInstance()
>>>     try {
>>>       while (isRunning) {
>>>         format.open(splitIterator.next())
>>>         while (isRunning && !format.reachedEnd()) {
>>>           nextElement = format.nextRecord(nextElement)
>>>           if (nextElement != null) {
>>>             sourceContext.collect(nextElement)
>>>           } else {
>>>             break
>>>           }
>>>           format.close()
>>>           if (isRunning) {
>>>             isRunning = splitIterator.hasNext
>>>           }
>>>         }
>>>       }
>>>     } finally {
>>>
>>>       format.close()
>>>       if (format.isInstanceOf[RichInputFormat[_,_]]) {
>>>         format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>>>       }
>>>       isRunning = false
>>>     }
>>>   }
>>>
>>>   override def cancel(): Unit = {
>>>     isRunning = false
>>>   }
>>>
>>>   override def close(): Unit = {
>>>     format.close()
>>>     if(format.isInstanceOf[RichInputFormat[_,_]]) {
>>>       format.asInstanceOf[RichInputFormat[_,_]].closeInputFormat()
>>>     }
>>>   }
>>>
>>>   private def getInputSplits(): Iterator[InputSplit] = {
>>>     new Iterator[InputSplit] {
>>>       private var nextSplit: InputSplit = _
>>>       private var exhausted: Boolean = _
>>>
>>>       override def hasNext: Boolean = {
>>>         if(exhausted) { return false }
>>>         if(nextSplit != null) { return true }
>>>         var split: InputSplit = null
>>>
>>>         try {
>>>           split = provider.getNextInputSplit(get
>>> RuntimeContext.getUserCodeClassLoader)
>>>         } catch {
>>>           case e: InputSplitProviderException =>
>>>             throw new RuntimeException("No InputSplit Provider", e)
>>>         }
>>>
>>>         if(split != null) {
>>>           nextSplit = split
>>>           true
>>>         } else {
>>>           exhausted = true
>>>           false
>>>         }
>>>       }
>>>
>>>       override def next(): InputSplit = {
>>>         if(nextSplit == null && !hasNext) {
>>>           throw new NoSuchElementException()
>>>         }
>>>         val tmp: InputSplit = nextSplit
>>>         nextSplit = null
>>>         tmp
>>>       }
>>>
>>>     }
>>>   }
>>> }
>>>
>>>
>>> On Wed, Oct 24, 2018 at 3:54 AM, Dawid Wysakowicz <
>>> dwysakow...@apache.org> wrote:
>>>
>>>> Hi Aaron,
>>>>
>>>> Could you share the code of you custom function?
>>>>
>>>> I am also adding Aljosha and Kostas to cc, who should be more helpful
>>>> on that topic.
>>>>
>>>> Best,
>>>>
>>>> Dawid
>>>> On 19/10/2018 20:06, Aaron Levin wrote:
>>>>
>>>> Hi,
>>>>
>>>> I'm writing a custom `SourceFunction` which wraps an underlying
>>>> `InputFormatSourceFunction`. When I try to use this `SourceFunction` in a
>>>> stream (via `env.addSource` and a subsequent sink) I get errors related to
>>>> the `InputSplitAssigner` not being initialized for a particular vertex ID.
>>>> Full error here[1].
>>>>
>>>> I believe the underlying error is related to this[0] call to
>>>> `instanceof InputFormatSourceFunction`.
>>>>
>>>> *My questions*:
>>>>
>>>> 1. how can I wrap a `InputFormatSourceFunction` which avoids this
>>>> error? Am I missing a chunk of the API covering this?
>>>> 2. is the error I'm experience related to that casting call? If so,
>>>> would ya'll be open to a PR which adds an interface one can extend which
>>>> will set the input format in the stream graph? Or is there a preferred way
>>>> of achieving this?
>>>>
>>>> Thanks!
>>>>
>>>> Aaron Levin
>>>>
>>>> [0] https://github.com/apache/flink/blob/release-1.6/flink-s
>>>> treaming-java/src/main/java/org/apache/flink/streaming/api/g
>>>> raph/StreamGraphGenerator.java#L480
>>>> [1]
>>>> java.lang.RuntimeException: Could not retrieve next input split.
>>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:157)
>>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>>> ourceFunction.open(InputFormatSourceFunction.java:71)
>>>>     at REDACTED
>>>>     at org.apache.flink.api.common.functions.util.FunctionUtils.ope
>>>> nFunction(FunctionUtils.java:36)
>>>>     at org.apache.flink.streaming.api.operators.AbstractUdfStreamOp
>>>> erator.open(AbstractUdfStreamOperator.java:102)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllO
>>>> perators(StreamTask.java:424)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(S
>>>> treamTask.java:290)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: 
>>>> org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException:
>>>> Requesting the next input split failed.
>>>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>>>> der.getNextInputSplit(RpcInputSplitProvider.java:69)
>>>>     at org.apache.flink.streaming.api.functions.source.InputFormatS
>>>> ourceFunction$1.hasNext(InputFormatSourceFunction.java:155)
>>>>     ... 8 more
>>>> Caused by: java.util.concurrent.ExecutionException:
>>>> java.lang.Exception: No InputSplitAssigner for vertex ID
>>>> cbc357ccb763df2852fee8c4fc7d55f2
>>>>     at java.util.concurrent.CompletableFuture.reportGet(Completable
>>>> Future.java:357)
>>>>     at java.util.concurrent.CompletableFuture.get(CompletableFuture
>>>> .java:1915)
>>>>     at org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvi
>>>> der.getNextInputSplit(RpcInputSplitProvider.java:61)
>>>>     ... 9 more
>>>> Caused by: java.lang.Exception: No InputSplitAssigner for vertex ID
>>>> cbc357ccb763df2852fee8c4fc7d55f2
>>>>     at org.apache.flink.runtime.jobmaster.JobMaster.requestNextInpu
>>>> tSplit(JobMaster.java:575)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAcce
>>>> ssorImpl.java:62)
>>>>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMe
>>>> thodAccessorImpl.java:43)
>>>>     at java.lang.reflect.Method.invoke(Method.java:498)
>>>>     at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvo
>>>> cation(AkkaRpcActor.java:247)
>>>> ...
>>>>
>>>>
>>>
>>
>>
>
>

Reply via email to