[GitHub] flink pull request #4422: [FLINK-7299][AVRO] Write GenericRecord using AvroO...
Github user soniclavier closed the pull request at: https://github.com/apache/flink/pull/4422 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4422: [FLINK-7299][AVRO] Write GenericRecord using AvroOutputFo...
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/4422 @aljoscha Thanks for taking a look. I have modified the test. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #4422: [FLINK-7299][AVRO] Write GenericRecord using AvroOutputFo...
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/4422 Link to passed Travis build : https://travis-ci.org/soniclavier/flink/builds/259243850 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4422: [FLINK-7299][AVRO] Write GenericRecord using AvroO...
GitHub user soniclavier opened a pull request: https://github.com/apache/flink/pull/4422 [FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat ## What is the purpose of the change Allow writing Avro GenericRecord using AvroOutputFormat. ## Brief change log - Added condition in AvroOutputFormat to check if avroValue is an instance of GenericRecord and create a GenericDatumWriter. ## Verifying this change This change added tests and can be verified as follows: - Added unit tests- testGenericRecord() in AvroOutputFormatTest to write GenericRecords. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: yes - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature?: no (not a major feature) - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/soniclavier/flink FLINK-7299-GenericRecord-in-AvroOutputFormat Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4422.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4422 commit 1c71ca43bcd5d4733e581f80637b531ba447e9dc Author: Vishnu Viswanath Date: 2017-07-31T04:58:28Z [FLINK-7299][AVRO] Write GenericRecord using AvroOutputFormat --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3763: [FLINK-6372][scripts] Fix change scala version of ...
Github user soniclavier closed the pull request at: https://github.com/apache/flink/pull/3763 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3763: [FLINK-6372][scripts] Fix change scala version of ...
GitHub user soniclavier opened a pull request: https://github.com/apache/flink/pull/3763 [FLINK-6372][scripts] Fix change scala version of flink-gelly-examples - changed change-scala-version.sh You can merge this pull request into a Git repository by running: $ git pull https://github.com/soniclavier/flink FLINK-6372 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3763.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3763 commit eb8c1e96f0cb64e387947cd194309b9763bed1b0 Author: Vishnu Viswanath Date: 2017-04-24T16:47:14Z [FLINK-6372][scripts] Fix change scala version of flink-gelly-examples --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2736 I don't think I can edit a closed issue, could you please make the edit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2736 Thank you for your guidance @aljoscha ð. Could you please tell me what Fix version I should keep for these JIRAs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2736: [FLINK-4174] Enhance evictor functionality
Github user soniclavier closed the pull request at: https://github.com/apache/flink/pull/2736 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2736 @aljoscha made the changes as per your comments, could you please review it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2355: [FLINK-4282]Add Offset Parameter to WindowAssigner...
Github user soniclavier commented on a diff in the pull request: https://github.com/apache/flink/pull/2355#discussion_r86656836 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java --- @@ -236,6 +236,6 @@ public int compare(TimeWindow o1, TimeWindow o2) { * @return window start */ public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) { --- End diff -- Is the windowSize actually slide size? I see that it is called by passing the slide instead of size `TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2736 @aljoscha I have made the changes, could you please review it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2736: [FLINK-4174] Enhance evictor functionality
Github user soniclavier commented on a diff in the pull request: https://github.com/apache/flink/pull/2736#discussion_r86462637 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java --- @@ -273,32 +284,93 @@ public void onProcessingTime(InternalTimer timer) throws Exception { return; } - TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); - if (triggerResult.isFire()) { - fire(context.window, contents); - } + TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); + if (triggerResult.isFire()) { + fire(context.window, contents, windowState); + } if (triggerResult.isPurge() || (!windowAssigner.isEventTime() && isCleanupTime(context.window, timer.getTimestamp( { cleanup(context.window, windowState, mergingWindows); } } - private void fire(W window, Iterable> contents) throws Exception { + private void fire(W window, Iterable> contents, ListState> windowState) throws Exception { timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp()); // Work around type system restrictions... - int toEvict = evictor.evict((Iterable) contents, Iterables.size(contents), context.window); - - FluentIterable projectedContents = FluentIterable + FluentIterable> recordsWithTimestamp = FluentIterable .from(contents) - .skip(toEvict) - .transform(new Function, IN>() { + .transform(new Function, TimestampedValue>() { + @Override + public TimestampedValue apply(StreamRecord input) { + return new TimestampedValue<>(input.getValue(), input.getTimestamp()); + } + }); + evictorContext.evictBefore(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); + + FluentIterable projectedContents = recordsWithTimestamp + .transform(new Function, IN>() { @Override - public IN apply(StreamRecord input) { + public IN apply(TimestampedValue input) { return input.getValue(); } }); + userFunction.apply(context.key, context.window, projectedContents, timestampedCollector); + evictorContext.evictAfter(recordsWithTimestamp, Iterables.size(recordsWithTimestamp)); + + + //work around to fix FLINK-4369, remove the evicted elements from the windowState. + //this is inefficient, but there is no other way to remove elements from ListState, which is an AppendingState. + windowState.clear(); + for(TimestampedValue record : recordsWithTimestamp) { + if (record.getTimestamp() < 0) { --- End diff -- Regarding the copy method: Are you asking me to add a copy method in the TimestampedValue that will return a corresponding StreamRecord, something like this: ```java /** * Creates a {@link StreamRecord} from this TimestampedValue. */ public StreamRecord getStreamRecord() { StreamRecord streamRecord = new StreamRecord<>(value); if (hasTimestamp) { streamRecord.setTimestamp(timestamp); } return streamRecord; } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2736: [FLINK-4174] Enhance evictor functionality
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2736 Thanks for the review @aljoscha :thumbsup:, I will make the changes soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2736: [FLINK-4174] Enhance evictor functionality
GitHub user soniclavier opened a pull request: https://github.com/apache/flink/pull/2736 [FLINK-4174] Enhance evictor functionality The PR implements [FLINK-4174](https://issues.apache.org/jira/browse/FLINK-4174) Enhance window evictor as proposed in [FLIP-4](https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor) Changes made: - Modified the Evictor API, added two new methods evictBefore and evictAfter. Removed existing evict method - Modified the corresponding implementations of CountEvictor, DeltaEvictor and TimeEvictor - Created EvictorContext in the class EvictingWindowOperator, which is passed to the evictor methods - Created TimestampedValue class which holds the value with corresponding timestamp. This class is exposed to the user via the evictBefore and evictAfter methods. - Modified EvictingWindowOperator class - to call evictBefore before calling window function and evictAfter after calling window function. - create FluentIterable> from Iterable> contents, which is passed to the evictor methods. - to clear the windowstate and add the remaining element(after the eviction) back to the state. (this fixes [FLINK-4369](https://issues.apache.org/jira/browse/FLINK-4369)) - Added test cases in EvictingWindowOperatorTest. You can merge this pull request into a Git repository by running: $ git pull https://github.com/soniclavier/flink FLINK-4174 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2736.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2736 commit bfd4fb509463dea0dc86e702c3aad0b0b9e70ff2 Author: Vishnu Viswanath Date: 2016-10-31T23:21:04Z [FLINK-4174] Enhance evictor functionality --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 One more question, is it possible to configure the JobManager Actor path that the client connects to, it looks like it default to `akka://flink/user/jobmanager`. In that way I can create a much more generic client. Note: I know this is initial version, just curious if this is already implemented. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Sorry, the compilation error was because the Tuple2 was scala.Tuple2 not flink Tuple2. Changing to `org.apache.flink.api.java.tuple.Tuple2` fixed the issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Thanks Ufuk & Stephen for the reply, I tried the serializers suggested by you ``` val typeHint = new TypeHint[Tuple2[Long,String]](){} val serializer = TypeInformation.of(typeHint).createSerializer(null) //also tried this val fieldSerializers = Array[TypeSerializer[_]](StringSerializer.INSTANCE, LongSerializer.INSTANCE) val serializer2 = new TupleSerializer(classOf[Tuple2[Long,String]].asInstanceOf[Class[_]].asInstanceOf[Class[Tuple2[String, Long]]], fieldSerializers) ``` But both gives me compilation error at ``` val serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( key, serializer2, VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE) ``` the compilation error is: ``` Error:(43, 7) type mismatch; found : org.apache.flink.api.common.typeutils.TypeSerializer[org.apache.flink.api.java.tuple.Tuple2[Long,String]] required: org.apache.flink.api.common.typeutils.TypeSerializer[java.io.Serializable] Note: org.apache.flink.api.java.tuple.Tuple2[Long,String] <: java.io.Serializable, but Java-defined class TypeSerializer is invariant in type T. You may wish to investigate a wildcard type such as `_ <: java.io.Serializable`. (SLS 3.2.10) serializer, ^ ``` I had seen this before when I tried to set the serializer from `queryableState.getKeySerializer` Note : It works fine when I use the longer version of serializer that I created. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Never mind, I was hitting with wrong key, it works now! Cheers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2051: [FLINK-3779] Add support for queryable state
Github user soniclavier commented on the issue: https://github.com/apache/flink/pull/2051 Hi, Continuing the discussion from the mailing list, I was able to go past the NettyConfig problem once I ran Flink in cluster mode ( I would still like to know if there is a way to run in local mode so that I can avoid running SBT assembly every time ). But now I am stuck at error message "KvState does not hold any state for key/namespace." which I believe is because of my KeySerializer. Since I am running the QueryClient as a separate application, I don't have access to my queryableState to call `queryableState.getKeySerializer` My key is a tuple of (Long,String) and this is the naive serializer that I wrote (which is probably wrong and I have never written a serializer before) ``` class KeySerializer extends TypeSerializerSingleton[(Long,String)]{ private val EMPTY: (Long,String) = (0,"") override def createInstance(): (Long, String) = EMPTY override def getLength: Int = return 2; override def canEqual(o: scala.Any): Boolean = return o.isInstanceOf[(Long,String)] override def copy(t: (Long, String)): (Long, String) = t override def copy(t: (Long, String), t1: (Long, String)): (Long, String) = t override def copy(dataInputView: DataInputView, dataOutputView: DataOutputView): Unit = { dataOutputView.writeLong(dataInputView.readLong()) StringValue.copyString(dataInputView,dataOutputView) } override def serialize(t: (Long, String), dataOutputView: DataOutputView): Unit = { dataOutputView.writeLong(t._1) StringValue.writeString(t._2,dataOutputView) } override def isImmutableType: Boolean = true override def deserialize(dataInputView: DataInputView): (Long, String) = { val l = dataInputView.readLong() val s = StringValue.readString(dataInputView) (l,s) } override def deserialize(t: (Long, String), dataInputView: DataInputView): (Long, String) = deserialize(dataInputView) } ``` Can you tell me what I am doing wrong here? Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---