[jira] [Created] (FLINK-18698) org.apache.flink.sql.parser.utils.ParserResource compile error

2020-07-23 Thread Jira
毛宗良 created FLINK-18698:
---

 Summary: org.apache.flink.sql.parser.utils.ParserResource compile 
error
 Key: FLINK-18698
 URL: https://issues.apache.org/jira/browse/FLINK-18698
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.12.0
Reporter: 毛宗良
 Attachments: image-2020-07-24-11-42-09-880.png

org.apache.flink.sql.parser.utils.ParserResource in flink-sql-parser import 
org.apache.flink.sql.parser.impl.ParseException which could not find.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Hi

2020-07-23 Thread Yangze Guo
Hi,

In order to better analyze this problem, it would be good to provide:
- The command and config you start flink.
-  Logs of JobManager and TaskManager, you could find it in the logs/
directory of your Flink distribution in standalone mode.

Best,
Yangze Guo

On Thu, Jul 23, 2020 at 10:10 PM Suryanarayana Murthy Maganti
 wrote:
>
> Hi , I can't run the apache flink from command link. Cluster is starting
> automatically and shutting down too
>
> --
> Thanks & Regards,
>
> Maganti Surya
> 6462208546


Re: [DISCUSS] Adding Azure Platform Support in DataStream, Table and SQL Connectors

2020-07-23 Thread Israel Ekpo
Thanks for the guidance Robert. I appreciate the prompt response and will
share the pull requests for the ADLS support [2] next week.

Regarding the additional, I would like to contribute them to the main
codebase [1] if possible

I initially thought maybe it is better to start outside the core codebase
but I think the adoption would be better if we have documentation on the
core Flink documentation and reduce the level of effort necessary to
integrate it for users. We will take time to make sure the docs and tests
for the connectors are solid and then we can bring them one at a time into
the core code base.

[1] https://github.com/apache/flink/tree/master/flink-connectors

[2] https://issues.apache.org/jira/browse/FLINK-18562


On Thu, Jul 23, 2020 at 3:41 AM Robert Metzger  wrote:

> Hi Israel,
>
> thanks a lot for reaching out! I'm very excited about your efforts to bring
> additional Azure support into Flink.
> There are ~50 threads on the user@ mailing list mentioning Azure -- that's
> good evidence that our users use Flink in Azure.
>
> Regarding your questions:
>
> Do I need to create a FLIP [2] in order to make these changes to bring the
> > new capabilities or the individual JIRA issues are sufficient?
>
>
> For the two DataLake FS tickets, I don't see the need for a FLIP.
>
> I am thinking about targeting Flink versions 1.10 through 1.12
> > For new connectors like this, how many versions can/should this be
> > integrated into?
>
>
> We generally don't add new features to old releases (unless there's a very
> good reason to backport the feature). Therefore, the new integrations will
> all go into the next major Flink release (in this case probably Flink 1.12
> for the first tickets).
>
> Are there any upcoming changes to supported Java and Scala versions that I
> > need to be aware of?
>
>
> No, I'm not aware of any upcoming changes. Java 8 and Java 11 are the two
> versions we test against.
>
> My goal is to add the first two features [FLINK-18562] and [FLINK-18568] to
> > the existing file system capabilities [1] and then have the other
> > connectors FLINK-1856[3-7] exist as standalone plugins.
>
>
> I like the order in which you approach the tickets. I assigned you to the
> first ticket and commented on the second one. I'm also willing to help
> review your pull requests.
>
> What do you mean by "standalone plugins" in the context of connectors?
> Would you like to contribute these connectors to the main Flink codebase,
> or maintain them outside Flink but list them in flink-packages.org?
>
> Best,
> Robert
>
>
> On Wed, Jul 22, 2020 at 10:43 AM Israel Ekpo  wrote:
>
> > I have opened the following issues to track new efforts to bring
> additional
> > Azure Support in the following areas to the connectors ecosystem.
> >
> > My goal is to add the first two features [FLINK-18562] and [FLINK-18568]
> to
> > the existing file system capabilities [1] and then have the other
> > connectors FLINK-1856[3-7] exist as standalone plugins.
> >
> > As more users adopt the additional connectors, we could incrementally
> bring
> > them into the core code base if necessary with sufficient support.
> >
> > I am new to the process so that I have a few questions:
> >
> > Do I need to create a FLIP [2] in order to make these changes to bring
> the
> > new capabilities or the individual JIRA issues are sufficient?
> >
> > I am thinking about targeting Flink versions 1.10 through 1.12
> > For new connectors like this, how many versions can/should this be
> > integrated into?
> >
> > Are there any upcoming changes to supported Java and Scala versions that
> I
> > need to be aware of?
> >
> > Any ideas or suggestions you have would be great.
> >
> > Below is a summary of the JIRA issues that were created to track the
> effort
> >
> > Add Support for Azure Data Lake Store Gen 2 in Flink File System
> > https://issues.apache.org/jira/browse/FLINK-18562
> >
> > Add Support for Azure Data Lake Store Gen 2 in Streaming File Sink
> > https://issues.apache.org/jira/browse/FLINK-18568
> >
> > Add Support for Azure Cosmos DB DataStream Connector
> > https://issues.apache.org/jira/browse/FLINK-18563
> >
> > Add Support for Azure Event Hub DataStream Connector
> > https://issues.apache.org/jira/browse/FLINK-18564
> >
> > Add Support for Azure Event Grid DataStream Connector
> > https://issues.apache.org/jira/browse/FLINK-18565
> >
> > Add Support for Azure Cognitive Search DataStream Connector
> > https://issues.apache.org/jira/browse/FLINK-18566
> >
> > Add Support for Azure Cognitive Search Table & SQL Connector
> > https://issues.apache.org/jira/browse/FLINK-18567
> >
> >
> > [1] https://github.com/apache/flink/tree/master/flink-filesystems
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
> >
>


Re: Kinesis Performance Issue (was [VOTE] Release 1.11.0, release candidate #4)

2020-07-23 Thread Kurt Young
>From my experience, java profilers are sometimes not accurate enough to
find out the performance regression
root cause. In this case, I would suggest you try out intel vtune amplifier
to watch more detailed metrics.

Best,
Kurt


On Fri, Jul 24, 2020 at 8:51 AM Thomas Weise  wrote:

> The cause of the issue is all but clear.
>
> Previously I had mentioned that there is no suspect change to the Kinesis
> connector and that I had reverted the AWS SDK change to no effect.
>
> https://issues.apache.org/jira/browse/FLINK-17496 actually fixed another
> regression in the previous release and is present before and after.
>
> I repeated the run with 1.11.0 core and downgraded the entire Kinesis
> connector to 1.10.1: Nothing changes, i.e. the regression is still present.
> Therefore we will need to look elsewhere for the root cause.
>
> Regarding the time spent in snapshotState, repeat runs reveal a wide range
> for both versions, 1.10 and 1.11. So again this is nothing pointing to a
> root cause.
>
> At this point, I have no ideas remaining other than doing a bisect to find
> the culprit. Any other suggestions?
>
> Thomas
>
>
> On Thu, Jul 16, 2020 at 9:19 PM Zhijiang  .invalid>
> wrote:
>
> > Hi Thomas,
> >
> > Thanks for your further profiling information and glad to see we already
> > finalized the location to cause the regression.
> > Actually I was also suspicious of the point of #snapshotState in previous
> > discussions since it indeed cost much time to block normal operator
> > processing.
> >
> > Based on your below feedback, the sleep time during #snapshotState might
> > be the main concern, and I also digged into the implementation of
> > FlinkKinesisProducer#snapshotState.
> > while (producer.getOutstandingRecordsCount() > 0) {
> >producer.flush();
> >try {
> >   Thread.sleep(500);
> >} catch (InterruptedException e) {
> >   LOG.warn("Flushing was interrupted.");
> >   break;
> >}
> > }
> > It seems that the sleep time is mainly affected by the internal
> operations
> > inside KinesisProducer implementation provided by amazonaws, which I am
> not
> > quite familiar with.
> > But I noticed there were two upgrades related to it in release-1.11.0.
> One
> > is for upgrading amazon-kinesis-producer to 0.14.0 [1] and another is for
> > upgrading aws-sdk-version to 1.11.754 [2].
> > You mentioned that you already reverted the SDK upgrade to verify no
> > changes. Did you also revert the [1] to verify?
> > [1] https://issues.apache.org/jira/browse/FLINK-17496
> > [2] https://issues.apache.org/jira/browse/FLINK-14881
> >
> > Best,
> > Zhijiang
> > --
> > From:Thomas Weise 
> > Send Time:2020年7月17日(星期五) 05:29
> > To:dev 
> > Cc:Zhijiang ; Stephan Ewen  >;
> > Arvid Heise ; Aljoscha Krettek  >
> > Subject:Re: Kinesis Performance Issue (was [VOTE] Release 1.11.0, release
> > candidate #4)
> >
> > Sorry for the delay.
> >
> > I confirmed that the regression is due to the sink (unsurprising, since
> > another job with the same consumer, but not the producer, runs as
> > expected).
> >
> > As promised I did CPU profiling on the problematic application, which
> gives
> > more insight into the regression [1]
> >
> > The screenshots show that the average time for snapshotState increases
> from
> > ~9s to ~28s. The data also shows the increase in sleep time during
> > snapshotState.
> >
> > Does anyone, based on changes made in 1.11, have a theory why?
> >
> > I had previously looked at the changes to the Kinesis connector and also
> > reverted the SDK upgrade, which did not change the situation.
> >
> > It will likely be necessary to drill into the sink / checkpointing
> details
> > to understand the cause of the problem.
> >
> > Let me know if anyone has specific questions that I can answer from the
> > profiling results.
> >
> > Thomas
> >
> > [1]
> >
> >
> https://docs.google.com/presentation/d/159IVXQGXabjnYJk3oVm3UP2UW_5G-TGs_u9yzYb030I/edit?usp=sharing
> >
> > On Mon, Jul 13, 2020 at 11:14 AM Thomas Weise  wrote:
> >
> > > + dev@ for visibility
> > >
> > > I will investigate further today.
> > >
> > >
> > > On Wed, Jul 8, 2020 at 4:42 AM Aljoscha Krettek 
> > > wrote:
> > >
> > >> On 06.07.20 20:39, Stephan Ewen wrote:
> > >> >- Did sink checkpoint notifications change in a relevant way, for
> > >> example
> > >> > due to some Kafka issues we addressed in 1.11 (@Aljoscha maybe?)
> > >>
> > >> I think that's unrelated: the Kafka fixes were isolated in Kafka and
> the
> > >> one bug I discovered on the way was about the Task reaper.
> > >>
> > >>
> > >> On 07.07.20 17:51, Zhijiang wrote:
> > >> > Sorry for my misunderstood of the previous information, Thomas. I
> was
> > >> assuming that the sync checkpoint duration increased after upgrade as
> it
> > >> was mentioned before.
> > >> >
> > >> > If I remembered correctly, the memory state backend also has the
> same
> > >> issue? If so, we can dismiss the rocksDB state 

Re: Kinesis Performance Issue (was [VOTE] Release 1.11.0, release candidate #4)

2020-07-23 Thread Thomas Weise
The cause of the issue is all but clear.

Previously I had mentioned that there is no suspect change to the Kinesis
connector and that I had reverted the AWS SDK change to no effect.

https://issues.apache.org/jira/browse/FLINK-17496 actually fixed another
regression in the previous release and is present before and after.

I repeated the run with 1.11.0 core and downgraded the entire Kinesis
connector to 1.10.1: Nothing changes, i.e. the regression is still present.
Therefore we will need to look elsewhere for the root cause.

Regarding the time spent in snapshotState, repeat runs reveal a wide range
for both versions, 1.10 and 1.11. So again this is nothing pointing to a
root cause.

At this point, I have no ideas remaining other than doing a bisect to find
the culprit. Any other suggestions?

Thomas


On Thu, Jul 16, 2020 at 9:19 PM Zhijiang 
wrote:

> Hi Thomas,
>
> Thanks for your further profiling information and glad to see we already
> finalized the location to cause the regression.
> Actually I was also suspicious of the point of #snapshotState in previous
> discussions since it indeed cost much time to block normal operator
> processing.
>
> Based on your below feedback, the sleep time during #snapshotState might
> be the main concern, and I also digged into the implementation of
> FlinkKinesisProducer#snapshotState.
> while (producer.getOutstandingRecordsCount() > 0) {
>producer.flush();
>try {
>   Thread.sleep(500);
>} catch (InterruptedException e) {
>   LOG.warn("Flushing was interrupted.");
>   break;
>}
> }
> It seems that the sleep time is mainly affected by the internal operations
> inside KinesisProducer implementation provided by amazonaws, which I am not
> quite familiar with.
> But I noticed there were two upgrades related to it in release-1.11.0. One
> is for upgrading amazon-kinesis-producer to 0.14.0 [1] and another is for
> upgrading aws-sdk-version to 1.11.754 [2].
> You mentioned that you already reverted the SDK upgrade to verify no
> changes. Did you also revert the [1] to verify?
> [1] https://issues.apache.org/jira/browse/FLINK-17496
> [2] https://issues.apache.org/jira/browse/FLINK-14881
>
> Best,
> Zhijiang
> --
> From:Thomas Weise 
> Send Time:2020年7月17日(星期五) 05:29
> To:dev 
> Cc:Zhijiang ; Stephan Ewen ;
> Arvid Heise ; Aljoscha Krettek 
> Subject:Re: Kinesis Performance Issue (was [VOTE] Release 1.11.0, release
> candidate #4)
>
> Sorry for the delay.
>
> I confirmed that the regression is due to the sink (unsurprising, since
> another job with the same consumer, but not the producer, runs as
> expected).
>
> As promised I did CPU profiling on the problematic application, which gives
> more insight into the regression [1]
>
> The screenshots show that the average time for snapshotState increases from
> ~9s to ~28s. The data also shows the increase in sleep time during
> snapshotState.
>
> Does anyone, based on changes made in 1.11, have a theory why?
>
> I had previously looked at the changes to the Kinesis connector and also
> reverted the SDK upgrade, which did not change the situation.
>
> It will likely be necessary to drill into the sink / checkpointing details
> to understand the cause of the problem.
>
> Let me know if anyone has specific questions that I can answer from the
> profiling results.
>
> Thomas
>
> [1]
>
> https://docs.google.com/presentation/d/159IVXQGXabjnYJk3oVm3UP2UW_5G-TGs_u9yzYb030I/edit?usp=sharing
>
> On Mon, Jul 13, 2020 at 11:14 AM Thomas Weise  wrote:
>
> > + dev@ for visibility
> >
> > I will investigate further today.
> >
> >
> > On Wed, Jul 8, 2020 at 4:42 AM Aljoscha Krettek 
> > wrote:
> >
> >> On 06.07.20 20:39, Stephan Ewen wrote:
> >> >- Did sink checkpoint notifications change in a relevant way, for
> >> example
> >> > due to some Kafka issues we addressed in 1.11 (@Aljoscha maybe?)
> >>
> >> I think that's unrelated: the Kafka fixes were isolated in Kafka and the
> >> one bug I discovered on the way was about the Task reaper.
> >>
> >>
> >> On 07.07.20 17:51, Zhijiang wrote:
> >> > Sorry for my misunderstood of the previous information, Thomas. I was
> >> assuming that the sync checkpoint duration increased after upgrade as it
> >> was mentioned before.
> >> >
> >> > If I remembered correctly, the memory state backend also has the same
> >> issue? If so, we can dismiss the rocksDB state changes. As the slot
> sharing
> >> enabled, the downstream and upstream should
> >> > probably deployed into the same slot, then no network shuffle effect.
> >> >
> >> > I think we need to find out whether it has other symptoms changed
> >> besides the performance regression to further figure out the scope.
> >> > E.g. any metrics changes, the number of TaskManager and the number of
> >> slots per TaskManager from deployment changes.
> >> > 40% regression is really big, I guess the changes should also be
> >> reflected in other places.
> >> >
> >> > I 

[Discuss] Questions about SortedMapState in Stream API

2020-07-23 Thread Sean Z
Hi Flink Community,

I'm new to this community but have been using Flink for a year or so. As a
user, my team built stuff upon Flink stateful stream processing. We used to
have a use case that we need a sorted map data structure to store our
state, something like TreeMap in Java, to query higher/lower keys, do range
queries, and etc. However, currently, we only have a MapState interface in
Flink. We found RocksDB is kind of sorted storage in nature. To achieve
our use case, we have to do some hacky tricks to bypass current limitations
and use lower RocksDB features as a workaround to implement our own
SortedMapState. I assume that there should be lots of other users who have
the same use case, so I have a few questions here.

1. Do we have a feature like SortedMapState already in place, in
development or in the future roadmap?

2. If not, could that be a good feature to have? and are there any other
concerns?

Just want to start a discussion here from a user perspective. If everything
goes well, we are also interested in contributing back to the community.


Best regards,
Sean


[jira] [Created] (FLINK-18697) Adding flink-table-api-java-bridge_2.11 to a Flink job kills the IDE logging

2020-07-23 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-18697:
--

 Summary: Adding flink-table-api-java-bridge_2.11 to a Flink job 
kills the IDE logging
 Key: FLINK-18697
 URL: https://issues.apache.org/jira/browse/FLINK-18697
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / API
Affects Versions: 1.11.0
Reporter: Robert Metzger


Steps to reproduce:
- Set up a Flink project using a Maven archetype
- Add "flink-table-api-java-bridge_2.11" as a dependency
- Running Flink won't produce any log output

Probable cause:
"flink-table-api-java-bridge_2.11" has a dependency to 
"org.apache.flink:flink-streaming-java_2.11:test-jar:tests:1.11.0", which 
contains a "log4j2-test.properties" file.

When I disable Log4j2 debugging (with "-Dlog4j2.debug"), I see the following 
line:
{code}
DEBUG StatusLogger Reconfiguration complete for context[name=3d4eac69] at URI 
jar:file:/Users/robert/.m2/repository/org/apache/flink/flink-streaming-java_2.11/1.11.0/flink-streaming-java_2.11-1.11.0-tests.jar!/log4j2-test.properties
 (org.apache.logging.log4j.core.LoggerContext@568bf312) with optional 
ClassLoader: null
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Inconsistent behavior between different states with ListState.get().iterator().remove()

2020-07-23 Thread Ken Krugler
Hi devs,

If you use the FsStateBackend (or MemoryStateBackend), and you have ListState, 
then you can get an iterator and remove() an entry, and it all works as 
expected.

If you use the RocksDBStateBackend, the remove() call doesn’t throw an 
exception, but the ListState isn’t updated.

Seems like either you should get an exception w/the remove() call, or the 
operation should work as expected.

I see https://issues.apache.org/jira/browse/FLINK-5651 
, though that seems only to 
be talking about FsStateBackend/MemoryStateBackend.

And I don’t understand the comment on that issue: "Actually, it can be fine to 
use Iterator#remove() as long as the user does not reply on these changes in 
the backing store”.

Thanks,

— Ken

PS - I understand there are many reasons to not remove arbitrary elements from 
a ListState when using RocksDB (serde cost for entire list), so I’d be in favor 
of the remove() call throwing an exception, at least with RocksDB.

--
Ken Krugler
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr



[jira] [Created] (FLINK-18696) Java 14 Records are not recognized as POJOs

2020-07-23 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-18696:
--

 Summary: Java 14 Records are not recognized as POJOs
 Key: FLINK-18696
 URL: https://issues.apache.org/jira/browse/FLINK-18696
 Project: Flink
  Issue Type: Improvement
  Components: API / Type Serialization System
Reporter: Robert Metzger


Having a record (https://openjdk.java.net/jeps/359) such as:
{code}
public record Simple(int id, String name) { }
{code}

Leads to this log message: "class de.robertmetzger.TableApiSql$Simple does not 
contain a setter for field id"

I believe the PojoSerializer should be able to use records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18695) Allow NettyBufferPool to allocate heap buffers

2020-07-23 Thread Chesnay Schepler (Jira)
Chesnay Schepler created FLINK-18695:


 Summary: Allow NettyBufferPool to allocate heap buffers
 Key: FLINK-18695
 URL: https://issues.apache.org/jira/browse/FLINK-18695
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / Network
Reporter: Chesnay Schepler
 Fix For: 1.12.0


in 4.1.43 netty made a change to their SslHandler to always use heap buffers 
for JDK SSLEngine implementations, to avoid an additional memory copy.

However, our {{NettyBufferPool}} forbids heap buffer allocations.

We will either have to allow heap buffer allocations, or create a custom 
SslHandler implementation that does not use heap buffers (although this seems 
ill-adviced?).

/cc [~sewen] [~uce] [~NicoK]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18694) Add unaligned checkpoint config to web ui

2020-07-23 Thread Kboh (Jira)
Kboh created FLINK-18694:


 Summary: Add unaligned checkpoint config to web ui
 Key: FLINK-18694
 URL: https://issues.apache.org/jira/browse/FLINK-18694
 Project: Flink
  Issue Type: Improvement
  Components: Runtime / REST, Runtime / Web Frontend
Reporter: Kboh


h2. What is the purpose of the change
 * Show in web ui if unaligned checkpoints are enabled.

h2. Brief change log
 * Adds unaligned checkpoint config to REST endpoint, and web ui.

 

[https://github.com/apache/flink/pull/12962]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18693) AvroSerializationSchema does not work with types generated by avrohugger

2020-07-23 Thread Aljoscha Krettek (Jira)
Aljoscha Krettek created FLINK-18693:


 Summary: AvroSerializationSchema does not work with types 
generated by avrohugger
 Key: FLINK-18693
 URL: https://issues.apache.org/jira/browse/FLINK-18693
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 1.10.2, 1.12.0, 1.11.1


The main problem is that the code in {{SpecificData.createSchema()}} tries to 
reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
generated classes. However, avrohugger generates this field in a companion 
object, which the reflective Java code will therefore not find.

This is also described in these ML threads:
 * 
[https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
 * 
[https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18692) AvroSerializationSchema does not work with types generated by avrohugger

2020-07-23 Thread Aljoscha Krettek (Jira)
Aljoscha Krettek created FLINK-18692:


 Summary: AvroSerializationSchema does not work with types 
generated by avrohugger
 Key: FLINK-18692
 URL: https://issues.apache.org/jira/browse/FLINK-18692
 Project: Flink
  Issue Type: Bug
  Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
 Fix For: 1.10.2, 1.12.0, 1.11.1


The main problem is that the code in {{SpecificData.createSchema()}} tries to 
reflectively read the {{SCHEMA$}} field, that is normally there in Avro 
generated classes. However, avrohugger generates this field in a companion 
object, which the reflective Java code will therefore not find.

This is also described in these ML threads:
 * 
[https://lists.apache.org/thread.html/5db58c7d15e4e9aaa515f935be3b342fe036e97d32e1fb0f0d1797ee@%3Cuser.flink.apache.org%3E]
 * 
[https://lists.apache.org/thread.html/cf1c5b8fa7f095739438807de9f2497e04ffe55237c5dea83355112d@%3Cuser.flink.apache.org%3E]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-129: Refactor Descriptor API to register connector in Table API

2020-07-23 Thread Jark Wu
Hi Timo,

That's a good point I missed in the design. I have updated the FLIP and
added a note under the `KafkaConnector` to mention this.
I will not list all the method names in the FLIP as the design doc is super
long now.


Hi Dawid,

1) KafkaConnector not extends TableDescriptor
The reason why KafkaConnector extends TableDescriptor is that, a builder
pattern "KafkaConnector.newBuilder()...build()" should return
"KafkaConnector" in theory.
So users can write something like the following code which might be
more intuitive.

KafkaConnector kafka = KafkaConnector.newBuilder()...build();
tEnv.createTemporaryTable("MyTable", kafka);

But I agree connector implementation will be simpler if this is not
strongly needed, e.g. we don't need the generic type for descriptor,
we don't need to pass the descriptor class in the builder. So I'm also fine
to not extend it if others don't against it. What's your opinion here @Timo
Walther  ?

2) LikeOptions
I am not very satisfied with the new design. Because the API is not very
fluent. Users will be interrupted to consider what the `overwrite()`
parameter to be.
And the API design doesn't protect users from using the wrong options
before running the code.
What about to list all possible options in one level? This will be more
aligned with SQL DDL and easy to understand and use for users.

public enum LikeOption {
  INCLUDING_ALL,
  INCLUDING_CONSTRAINTS,
  INCLUDING_GENERATED,
  INCLUDING_OPTIONS,
  INCLUDING_PARTITIONS,
  INCLUDING_WATERMARKS,

  EXCLUDING_ALL,
  EXCLUDING_CONSTRAINTS,
  EXCLUDING_GENERATED,
  EXCLUDING_OPTIONS,
  EXCLUDING_PARTITIONS,
  EXCLUDING_WATERMARKS,

  OVERWRITING_GENERATED,
  OVERWRITING_OPTIONS
}

3) register the table under a generated table path
I'm afraid we have to do that. The generated table path is still needed for
`TableSourceTable#tableIdentifier` which is used to calculate the digest.
This requires that the registered table must have an unique identifier. The
old `TableSourceQueryOperation` will also generate the identifier according
 to the hashcode of the TableSource object. However, the generated
identifier "Unregistered_TableSource_1234" is still possible to be in
conflict with
the user's table path. Therefore, I prefer to register the generated name
in the (temporary) catalog to throw explicit exceptions, rather than
generating a wrong plan.


Hi @Leonard Xu  and @Jingsong Li 
 ,

Do you have other concerns on the latest FLIP and the above discussion?

Best,
Jark

On Thu, 23 Jul 2020 at 18:26, Dawid Wysakowicz 
wrote:

> Hi Jark,
>
> Thanks for the update. I think the FLIP looks really well on the high
> level.
>
> I have a few comments to the code structure in the FLIP:
>
> 1) I really don't like how the TableDescriptor exposes protected fields.
> Moreover why do we need to extend from it? I don't think we need
> KafkaConnector extends TableDescriptor and alike. We only need the builders
> e.g. the KafkaConnectorBuilder.
>
> If I understand it correctly this is the interface needed from the
> TableEnvironment perspective and it is the contract that the
> TableEnvironment expects. I would suggest making it an interface:
> @PublicEvolving
> public interface TableDescriptor {
> List getPartitionedFields();
>   Schema getSchema();
> Map getOptions();
> LikeOption[] getLikeOptions();
> String getLikePath();
> }
>
> Then the TableDescriptorBuilder would work with an internal implementation
> of this interface
> @PublicEvolving
> public abstract class TableDescriptorBuilder TableDescriptorBuilder>
> {
>
> private final InternalTableDescriptor descriptor = new
> InternalTableDescriptor();
>
> /**
>  * Returns the this builder instance in the type of subclass.
>  */
> protected abstract BUILDER self();
>
> /**
>  * Specifies the table schema.
>  */
> public BUILDER schema(Schema schema) {
> descriptor.schema = schema;
> return self();
> }
>
> /**
>  * Specifies the partition keys of this table.
>  */
> public BUILDER partitionedBy(String... fieldNames) {
> checkArgument(descriptor.partitionedFields.isEmpty(), 
> "partitionedBy(...)
> shouldn't be called more than once.");
> descriptor.partitionedFields.addAll(Arrays.asList(fieldNames));
> return self();
> }
>
> /**
>  * Extends some parts from the original registered table path.
>  */
> public BUILDER like(String tablePath, LikeOption... likeOptions) {
> descriptor.likePath = tablePath;
> descriptor.likeOptions = likeOptions;
> return self();
> }
>
> protected BUILDER option(String key, String value) {
> descriptor.options.put(key, value);
> return self();
> }
>
> /**
>  * Returns created table descriptor.
>  */
> public TableDescriptor build() {
> 

[jira] [Created] (FLINK-18691) add HiveCatalog Construction method with HiveConf

2020-07-23 Thread Jun Zhang (Jira)
Jun Zhang created FLINK-18691:
-

 Summary: add HiveCatalog Construction method with HiveConf
 Key: FLINK-18691
 URL: https://issues.apache.org/jira/browse/FLINK-18691
 Project: Flink
  Issue Type: Improvement
  Components: Connectors / Hive
Affects Versions: 1.11.1
Reporter: Jun Zhang
 Fix For: 1.12.0


Currently HiveCatalog has two public construction methods. They all need a 
hiveConfDir variable, which is the path of hive local configuration file. But 
when we use the Application mode to submit job, the job is submitted on the 
master node of the cluster, and there may be no hive configuration on the 
cluster, we can not get the local hive conf path ,so we add a public 
construction method with HiveConf, which is convenient for users to use.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Hi

2020-07-23 Thread Suryanarayana Murthy Maganti
Hi , I can't run the apache flink from command link. Cluster is starting
automatically and shutting down too

-- 
Thanks & Regards,

Maganti Surya
6462208546


[jira] [Created] (FLINK-18689) Deterministic Slot Sharing

2020-07-23 Thread Andrey Zagrebin (Jira)
Andrey Zagrebin created FLINK-18689:
---

 Summary: Deterministic Slot Sharing
 Key: FLINK-18689
 URL: https://issues.apache.org/jira/browse/FLINK-18689
 Project: Flink
  Issue Type: New Feature
  Components: Runtime / Coordination
Reporter: Andrey Zagrebin


[Design 
doc|https://docs.google.com/document/d/10CbCVDBJWafaFOovIXrR8nAr2BnZ_RAGFtUeTHiJplw]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18690) Implement LocalInputPreferredSlotSharingStrategy

2020-07-23 Thread Andrey Zagrebin (Jira)
Andrey Zagrebin created FLINK-18690:
---

 Summary: Implement LocalInputPreferredSlotSharingStrategy
 Key: FLINK-18690
 URL: https://issues.apache.org/jira/browse/FLINK-18690
 Project: Flink
  Issue Type: Sub-task
Reporter: Andrey Zagrebin
Assignee: Zhu Zhu


Implement ExecutionSlotSharingGroup, SlotSharingStrategy and default 
LocalInputPreferredSlotSharingStrategy.

The default strategy would be LocalInputPreferredSlotSharingStrategy. It will 
try to reduce remote data exchanges. Subtasks, which are connected and belong 
to the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup.

See [design 
doc|https://docs.google.com/document/d/10CbCVDBJWafaFOovIXrR8nAr2BnZ_RAGFtUeTHiJplw/edit#heading=h.t4vfmm4atqoy]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18687) ProjectionCodeGenerator#generateProjectionExpression should remove for loop optimization

2020-07-23 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-18687:
---

 Summary: ProjectionCodeGenerator#generateProjectionExpression 
should remove for loop optimization
 Key: FLINK-18687
 URL: https://issues.apache.org/jira/browse/FLINK-18687
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.0
Reporter: Caizhi Weng
 Fix For: 1.12.0


If too many fields of the same type are projected, 
{{ProjectionCodeGenerator#generateProjectionExpression}} currently performs a 
"for loop optimization" which, instead of generating code separately for each 
field, they'll be squashed into a for loop.

However, if the indices of the fields with the same type are not continuous, 
this optimization will not write fields in index ascending order. This is not 
acceptable because {{BinaryWriter}}s expect the users to write fields in index 
ascending order (that is to say, we *have to* first write field 0, then field 
1, then...), otherwise the variable length area of the two binary rows with 
same data might be different. Although we can use {{getXX}} methods of 
{{BinaryRow}} to get the fields correctly, states for streaming jobs compare 
state keys with binary bits, not with the contents of the keys. So we need to 
make sure the binary bits of the binary rows be the same if two rows contain 
the same data.

What's worse, as the current implementation of 
{{ProjectionCodeGenerator#generateProjectionExpression}} uses a scala 
{{HashMap}}, the key order of the map might be different on different workers; 
Even if the projection does not meet the condition to be optimized, it will 
still be affected by this bug.

What I suggest is to simply remove this optimization. Because if we still want 
this optimization, we have to make sure that the fields of the same type have 
continuous order, which is a very strict and rare condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18688) ProjectionCodeGenerator#generateProjectionExpression should remove for loop optimization

2020-07-23 Thread Caizhi Weng (Jira)
Caizhi Weng created FLINK-18688:
---

 Summary: ProjectionCodeGenerator#generateProjectionExpression 
should remove for loop optimization
 Key: FLINK-18688
 URL: https://issues.apache.org/jira/browse/FLINK-18688
 Project: Flink
  Issue Type: Bug
  Components: Table SQL / Runtime
Affects Versions: 1.11.0
Reporter: Caizhi Weng
 Fix For: 1.12.0


If too many fields of the same type are projected, 
{{ProjectionCodeGenerator#generateProjectionExpression}} currently performs a 
"for loop optimization" which, instead of generating code separately for each 
field, they'll be squashed into a for loop.

However, if the indices of the fields with the same type are not continuous, 
this optimization will not write fields in index ascending order. This is not 
acceptable because {{BinaryWriter}} expects the users to write fields in index 
ascending order (that is to say, we *have to* first write field 0, then field 
1, then...), otherwise the variable length area of the two binary rows with 
same data might be different. Although we can use {{getXX}} methods of 
{{BinaryRow}} to get the fields correctly, states for streaming jobs compare 
state keys with binary bits, not with the contents of the keys. So we need to 
make sure the binary bits of the binary rows be the same if two rows contain 
the same data.

What's worse, as the current implementation of 
{{ProjectionCodeGenerator#generateProjectionExpression}} uses a scala 
{{HashMap}}, the key order of the map might be different on different workers; 
Even if the projection does not meet the condition to be optimized, it will 
still be affected by this bug.

What I suggest is to simply remove this optimization. Because if we still want 
this optimization, we have to make sure that the fields of the same type have 
continuous order, which is a very strict and rare condition.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18686) Getting the emit time when `table.exec.emit.early-fire.enabled` is true

2020-07-23 Thread hailong wang (Jira)
hailong wang created FLINK-18686:


 Summary: Getting the emit time when 
`table.exec.emit.early-fire.enabled` is true
 Key: FLINK-18686
 URL: https://issues.apache.org/jira/browse/FLINK-18686
 Project: Flink
  Issue Type: Improvement
Reporter: hailong wang


We can turn `table.exec.emit.early-fire.enabled`  on to let window early-fire. 
But users always want to get the emit time.

So can we support auxiliary Function to support this, may be like TUMBLE_EMIT, 
HOP_EMIT?

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Planning Flink 1.12

2020-07-23 Thread Till Rohrmann
Thanks for being our release managers for the 1.12 release Dian & Robert!

Here are some features I would like to work on for this release:

# Features

## Finishing pipelined region scheduling (
https://issues.apache.org/jira/browse/FLINK-16430)
With the pipelined region scheduler we want to implement a scheduler which
can serve streaming as well as batch workloads alike while being able to
run jobs under constrained resources. The latter is particularly important
for bounded streaming jobs which, currently, are not well supported.

## Reactive-scaling mode
Being able to react to newly available resources and rescaling a running
job accordingly will make Flink's operation much easier because resources
can then be controlled by an external tool (e.g. GCP autoscaling, K8s
horizontal pod scaler, etc.). In this release we want to make a big step
towards this direction. As a first step we want to support the execution of
jobs with a parallelism which is lower than the specified parallelism in
case that Flink lost a TaskManager or could not acquire enough resources.

# Maintenance/Stability

## JM / TM finished task reconciliation (
https://issues.apache.org/jira/browse/FLINK-17075)
This prevents the system from going out of sync if a task state change from
the TM to the JM is lost.

## Make metrics services work with Kubernetes deployments (
https://issues.apache.org/jira/browse/FLINK-11127)
Invert the direction in which the MetricFetcher connects to the
MetricQueryFetchers. That way it will no longer be necessary to expose on
K8s for every TaskManager a port on which the MetricQueryFetcher runs. This
will then make the deployment of Flink clusters on K8s easier.

## Handle long-blocking operations during job submission (savepoint
restore) (https://issues.apache.org/jira/browse/FLINK-16866)
Submitting a Flink job can involve the interaction with external systems
(blocking operations). Depending on the job the interactions can take so
long that it exceeds the submission timeout which reports a failure on the
client side even though the actual submission succeeded. By decoupling the
creation of the ExecutionGraph from the job submission, we can make the job
submission non-blocking which will solve this problem.

## Make IDs more intuitive to ease debugging (FLIP-118) (
https://issues.apache.org/jira/browse/FLINK-15679)
By making the internal Flink IDs compositional or logging how they belong
together, we can make the debugging of Flink's operations much easier.

Cheers,
Till


On Thu, Jul 23, 2020 at 7:48 AM Canbin Zheng  wrote:

> Hi All,
>
> Thanks for bring-up this discussion, Robert!
> Congratulations on becoming the release manager of 1.12, Dian and Robert !
>
> --
> Here are some of my thoughts of the features for native integration with
> Kubernetes in Flink 1.12:
>
> 1. Support user-specified pod templates
> Description:
> The current approach of introducing new configuration options for each
> aspect of pod specification a user might wish is becoming unwieldy, we have
> to maintain more and more Flink side Kubernetes configuration options and
> users have to learn the gap between the declarative model used by
> Kubernetes and the configuration model used by Flink. It's a great
> improvement to allow users to specify pod templates as central places for
> all customization needs for the jobmanager and taskmanager pods.
> Benefits:
> Users can leverage many of the advanced K8s features that the Flink
> community does not support explicitly, such as volume mounting, DNS
> configuration, pod affinity/anti-affinity setting, etc.
>
> 2. Support running PyFlink on Kubernetes
> Description:
> Support running PyFlink on Kubernetes, including session cluster and
> application cluster.
> Benefits:
> Running python application in a containerized environment.
>
> 3. Support built-in init-Container
> Description:
> We need a built-in init-Container to help solve dependency management
> in a containerized environment, especially in the application mode.
> Benefits:
> Separate the base Flink image from dynamic dependencies.
>
> 4. Support accessing secured services via K8s secrets
> Description:
> Kubernetes Secrets
>  can be used to
> provide credentials for a Flink application to access secured services. It
> helps people who want to use a user-specified K8s Secret through an
> environment variable.
> Benefits:
> Improve user experience.
>
> 5. Support configuring replica of JobManager Deployment in ZooKeeper HA
> setups
> Description:
> Make the *replica* of Deployment configurable in the ZooKeeper HA
> setups.
> Benefits:
> Achieve faster failover.
>
> 6. Support to configure limit for CPU requirement
> Description:
> To leverage the Kubernetes feature of container request/limit CPU.
> Benefits:
> Reduce cost.
>
> Regards,
> Canbin Zheng
>
> Harold.Miao  

Re: Is it possible to do state migration with checkpoints?

2020-07-23 Thread Sivaprasanna
Adding dev@ to get some traction. Any help would be greatly appreciated.

Thanks.

On Thu, Jul 23, 2020 at 11:48 AM Sivaprasanna 
wrote:

> +user...@flink.apache.org 
>
> A follow up question. I tried taking a savepoint but the job failed
> immediately. It happens everytime I take a savepoint. The job is running on
> a Yarn cluster so it fails with "container running out of memory". The
> state size averages around 1.2G but also peaks to ~4.5 GB sometimes (please
> refer to the screenshot below). The job is running with 2GB task manager
> heap & 2GB task manager managed memory. I increased the managed memory to
> 6GB assuming the failure has something to do with RocksDB but it failed
> even with 6GB managed memory. I guess I am missing on some configurations.
> Can you folks please help me with this?
>
> [image: Screenshot 2020-07-23 at 10.34.29 AM.png]
>
> On Wed, Jul 22, 2020 at 7:32 PM Sivaprasanna 
> wrote:
>
>> Hi,
>>
>> We are trying out state schema migration for one of our stateful
>> pipelines. We use few Avro type states. Changes made to the job:
>> 1. Updated the schema for one of the states (added a new 'boolean'
>> field with default value).
>> 2. Modified the code by removing a couple of ValueStates.
>>
>> To push these changes, I stopped the live job and resubmitted the new jar
>> with the latest *checkpoint* path. However, the job failed with the
>> following error:
>>
>> java.lang.RuntimeException: Error while getting state
>> at
>> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
>> at
>> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
>> ...
>> ...
>> Caused by: org.apache.flink.util.StateMigrationException: The new state
>> serializer cannot be incompatible.
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:543)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:491)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:652)
>>
>> I was going through the state schema evolution doc. The document mentions
>> that we need to take a *savepoint* and restart the job with the savepoint
>> path. We are using RocksDB backend with incremental checkpoint enabled. Can
>> we not use the latest checkpoint available when we are dealing with state
>> schema changes?
>>
>> Complete stacktrace is attached with this mail.
>>
>> -
>> Sivaprasanna
>>
>


[jira] [Created] (FLINK-18685) JobClient.getAccumulators() blocks until streaming job has finished in local environment

2020-07-23 Thread Robert Metzger (Jira)
Robert Metzger created FLINK-18685:
--

 Summary: JobClient.getAccumulators() blocks until streaming job 
has finished in local environment
 Key: FLINK-18685
 URL: https://issues.apache.org/jira/browse/FLINK-18685
 Project: Flink
  Issue Type: Bug
  Components: API / DataStream
Affects Versions: 1.11.0
Reporter: Robert Metzger


*Steps to reproduce:*
{code:java}
JobClient client = env.executeAsync("Test");

CompletableFuture status = client.getJobStatus();
LOG.info("status = " + status.get());

CompletableFuture> accumulators = 
client.getAccumulators(StreamingJob.class.getClassLoader());
LOG.info("accus = " + accumulators.get(5, TimeUnit.SECONDS));
{code}

*Actual behavior*
The accumulators future will never complete for a streaming job when calling 
this just in your main() method from the IDE.

*Expected behavior*
Receive the accumulators of the running streaming job.
The JavaDocs of the method state the following: "Accumulators can be requested 
while it is running or after it has finished.". 
While it is technically true that I can request accumulators, I was expecting 
as a user that I can access the accumulators of a running job.
Also, I can request accumulators if I submit the job to a cluster.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (FLINK-18684) Performance regression around 2020.07.22 in serialization

2020-07-23 Thread Roman Khachatryan (Jira)
Roman Khachatryan created FLINK-18684:
-

 Summary: Performance regression around 2020.07.22 in serialization
 Key: FLINK-18684
 URL: https://issues.apache.org/jira/browse/FLINK-18684
 Project: Flink
  Issue Type: Bug
  Components: Benchmarks
Affects Versions: 1.12.0
Reporter: Roman Khachatryan
Assignee: Roman Khachatryan


{quote}Performance regression
[serializerHeavyString|http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=serializerHeavyString=2=200=off=on=on]:
median=242, last=203, dev=39, median of last 10=203, trend change: -17,
median dev=4, ratio=9, threshold=6
[serializerKryo|http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=serializerKryo=2=200=off=on=on]:
median=245, last=206, dev=39, median of last 10=222, trend change: -10,
median dev=5, ratio=7, threshold=6
[serializerPojo|http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=serializerPojo=2=200=off=on=on]:
median=640, last=605, dev=35, median of last 10=595, trend change: -8,
median dev=23, ratio=1, threshold=6
[serializerRow|http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=serializerRow=2=200=off=on=on]:
median=865, last=810, dev=55, median of last 10=820, trend change: -6,
median dev=18, ratio=3, threshold=6
[serializerTuple|http://codespeed.dak8s.net:8000/timeline/#/?exe=1,3=serializerTuple=2=200=off=on=on]:
median=1009, last=958, dev=51, median of last 10=968, trend change: -5,
median dev=18, ratio=2, threshold=6{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[DISCUSS] Align the semantic of returning null from InputFormat#nextRecord

2020-07-23 Thread Benchao Li
Hi all,

I'd like to discuss about the semantic of returning null from
InputFormat#nextRecord.

For now, there is no explicit java doc about this. And there are three ways
to handle
this in Flink:
1. treat null as the end of input
2. skip null
3. assumes that the value from InputFormat#nextRecord cannot be null

I quickly searched in Flink codebase about these usage:
- org.apache.flink.api.common.operators.GenericDataSourceBase [2]
- org.apache.flink.api.java.io.CsvInputFormat [2]
- org.apache.flink.runtime.operators.DataSourceTask [2]
-
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator
[2]
- org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
[1]
- org.apache.flink.table.sources.CsvTableSource [1]
- org.apache.flink.table.runtime.io.CRowValuesInputFormat [3]
- org.apache.flink.table.filesystem.FileSystemLookupFunction [3]

I think we can align these behavior. about the alignment, I personally lean
to #2

A step further, when will InputFormat#nextRecord returns null?
One scenario is that we encountered dirty data, and want to skip it.
Actually we face the same problem in
org.apache.flink.api.common.serialization.DeserializationSchema
in the past, and in 1.11 we added a method `void deserialize(byte[]
message, Collector out)`.
It's default behavior is to ignore the null return value.

Then could we also add a method `void nextRecord(OT reuse, Collector
collector)`
in InputFormat?
Adding this method will enable us to return multi records in one call,
which is very flexible for implementing an InputFormat.

WDHY?

-- 

Best,
Benchao Li


Re: [DISCUSS] FLIP-129: Refactor Descriptor API to register connector in Table API

2020-07-23 Thread Dawid Wysakowicz
Hi Jark,

Thanks for the update. I think the FLIP looks really well on the high level.

I have a few comments to the code structure in the FLIP:

1) I really don't like how the TableDescriptor exposes protected fields.
Moreover why do we need to extend from it? I don't think we need
KafkaConnector extends TableDescriptor and alike. We only need the
builders e.g. the KafkaConnectorBuilder.

If I understand it correctly this is the interface needed from the
TableEnvironment perspective and it is the contract that the
TableEnvironment expects. I would suggest making it an interface:

|@PublicEvolving|
|public| interface |TableDescriptor {|
|    ||List getPartitionedFields();|
      |Schema getSchema();|
|    ||Map getOptions();|
|    ||LikeOption[] getLikeOptions();|
|    ||String getLikePath();|
|}|
|
|

Then the TableDescriptorBuilder would work with an internal
implementation of this interface

|@PublicEvolving|
|public| |abstract| |class| |TableDescriptorBuilder<||BUILDER ||extends|
|TableDescriptorBuilder> {|
 
|||private| |final| |InternalTableDescriptor||descriptor = new
InternalTableDescriptor();|
 
|||/**|
| ||* Returns the this builder instance in the type of subclass.|
| ||*/|
|||protected| |abstract| |BUILDER self();|
 
|||/**|
| ||* Specifies the table schema.|
| ||*/|
|||public| |BUILDER schema(Schema schema) {|
|||descriptor.schema = schema;|
|||return| |self();|
|||}|
 
|||/**|
| ||* Specifies the partition keys of this table.|
| ||*/|
|||public| |BUILDER partitionedBy(String... fieldNames) {|
|||checkArgument(descriptor.partitionedFields.isEmpty(),
||"partitionedBy(...) shouldn't be called more than once."||);|
|||descriptor.partitionedFields.addAll(Arrays.asList(fieldNames));|
|||return| |self();|
|||}|
 
|||/**|
| ||* Extends some parts from the original registered table path.|
| ||*/|
|||public| |BUILDER like(String tablePath, LikeOption... likeOptions) {|
|||descriptor.likePath = tablePath;|
|||descriptor.likeOptions = likeOptions;|
|||return| |self();|
|||}|
 
|||protected| |BUILDER option(String key, String value) {|
|||descriptor.options.put(key, value);|
|||return| |self();|
|||}|
 
|||/**|
| ||* Returns created table descriptor.|
| ||*/|
|||public| |TableDescriptor||build() {|
|||return| |descriptor;|
|||}|
|}|


2) I'm also not the biggest fun of how the LikeOptions are suggested in
the doc. Can't we have something more like

|class LikeOption {|

|    public enum MergingStrategy {
        INCLUDING,
        EXCLUDING,
        OVERWRITING
    }
|

|    public enum FeatureOption {
        ALL,
        CONSTRAINTS,
        GENERATED,
        OPTIONS,
        PARTITIONS,
        WATERMARKS
    }

    private final MergingStrategy mergingStrategy;
    private final FeatureOption featureOption;|

|
|

|    public static final LikeOption including(FeatureOption option) {|

|        return new LikeOption(MergingStrategy.INCLUDING, option);
|

|    }|

|    public static final LikeOption overwriting(FeatureOption option) {|

|        Preconditions.checkArgument(option != ALL && ...);
|

|        return new LikeOption(MergingStrategy.INCLUDING, option);
|

|    }|

||

|}|


3) TableEnvironment#from(descriptor) will register descriptor under a
system generated table path (just like TableImpl#toString) first, and
scan from the table path to derive the Table. Table#executeInsert() does
it in the similar way.

I would try not to register the table under a generated table path. Do
we really need that? I am pretty sure we can use the tables without
registering them in a catalog. Similarly to the old
TableSourceQueryOperation.

Otherwise looks good||

|Best,|

|Dawid
|

| 
|
On 23/07/2020 10:35, Timo Walther wrote:
> Hi Jark,
>
> thanks for the update. I think the FLIP is in a really good shape now
> and ready to be voted. If others have no further comments?
>
> I have one last comment around the methods of the descriptor builders.
> When refactoring classes such as `KafkaConnector` or
> `ElasticsearchConnector`. We should align the method names with the
> new property names introduced in FLIP-122:
>
> KafkaConnector.newBuilder()
>   // similar to scan.startup.mode=earliest-offset
>   .scanStartupModeEarliest()
>   // similar to sink.partitioner=round-robin
>   .sinkPartitionerRoundRobin()
>
> What do you think?
>
> Thanks for driving this,
> Timo
>
>
> On 22.07.20 17:26, Jark Wu wrote:
>> Hi all,
>>
>> After some offline discussion with other people, I'm also fine with
>> using
>> the builder pattern now,
>>   even though I still think the `.build()` method is a little verbose
>> in the
>> user code.
>>
>> I have updated the FLIP with following changes:
>>
>> 1) use builder pattern instead of "new" keyword. In order to avoid
>> duplicate code and reduce development burden for connector developers,
>>    

[jira] [Created] (FLINK-18683) Support @DataTypeHint for TableFunction output types

2020-07-23 Thread Fabian Hueske (Jira)
Fabian Hueske created FLINK-18683:
-

 Summary: Support @DataTypeHint for TableFunction output types
 Key: FLINK-18683
 URL: https://issues.apache.org/jira/browse/FLINK-18683
 Project: Flink
  Issue Type: Sub-task
  Components: Table SQL / API
Reporter: Fabian Hueske


For ScalarFunctions, the return type of an eval method can be declared with a 
{{@DataTypeHint}}:


{code:java}
@DataTypeHint("INT")
public Integer eval(Integer value) {
  return value * 2;
}{code}

This does not work for TableFunctions because the {{@DataTypeHint}} annotation 
refers to the {{void}} return type. Hence, {{TableFunction}} {{eval()}} methods 
must always be annotated with the more complex {{@FunctionHint}} method.
However, I think that context, it is clear that the {{@DataTypeHint}} 
annotation refers to the actual return type of the table function (the type 
parameter of {{TableFunction}}).



We could consider allowing {{@DataTypeHint}} annotations also on 
{{TableFunction}} classes (defining the output type of all eval methods) and 
{{eval()}} methods.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [ANNOUNCE] New PMC member: Piotr Nowojski

2020-07-23 Thread Matthias Pohl
Congratulations, Piotr!

On Wed, Jul 22, 2020 at 8:52 AM Forward Xu  wrote:

> Congratulations!
>
>
> Best,
>
> Forward
>
> godfrey he  于2020年7月22日周三 上午10:45写道:
>
> > Congratulations!
> >
> > Best,
> > Godfrey
> >
> > Till Rohrmann  于2020年7月21日周二 下午10:46写道:
> >
> > > Congrats, Piotr!
> > >
> > > Cheers,
> > > Till
> > >
> > > On Thu, Jul 9, 2020 at 4:15 AM Xingcan Cui  wrote:
> > >
> > > > Congratulations, Piotr!
> > > >
> > > > Best, Xingcan
> > > >
> > > > On Wed, Jul 8, 2020, 21:53 Yang Wang  wrote:
> > > >
> > > > > Congratulations Piotr!
> > > > >
> > > > >
> > > > > Best,
> > > > > Yang
> > > > >
> > > > > Dan Zou  于2020年7月8日周三 下午10:36写道:
> > > > >
> > > > > > Congratulations!
> > > > > >
> > > > > > Best,
> > > > > > Dan Zou
> > > > > >
> > > > > > > 2020年7月8日 下午5:25,godfrey he  写道:
> > > > > > >
> > > > > > > Congratulations
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>


-- 

Matthias Pohl | Engineer

Follow us @VervericaData Ververica 

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Yip Park Tung Jason, Jinwei (Kevin) Zhang, Karl Anton
Wehner


[jira] [Created] (FLINK-18682) Vector orc reader cannot read Hive 2.0.0 table

2020-07-23 Thread Rui Li (Jira)
Rui Li created FLINK-18682:
--

 Summary: Vector orc reader cannot read Hive 2.0.0 table
 Key: FLINK-18682
 URL: https://issues.apache.org/jira/browse/FLINK-18682
 Project: Flink
  Issue Type: Bug
  Components: Connectors / Hive, Formats (JSON, Avro, Parquet, ORC, 
SequenceFile)
Affects Versions: 1.11.1
Reporter: Rui Li






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] FLIP-129: Refactor Descriptor API to register connector in Table API

2020-07-23 Thread Timo Walther

Hi Jark,

thanks for the update. I think the FLIP is in a really good shape now 
and ready to be voted. If others have no further comments?


I have one last comment around the methods of the descriptor builders. 
When refactoring classes such as `KafkaConnector` or 
`ElasticsearchConnector`. We should align the method names with the new 
property names introduced in FLIP-122:


KafkaConnector.newBuilder()
  // similar to scan.startup.mode=earliest-offset
  .scanStartupModeEarliest()
  // similar to sink.partitioner=round-robin
  .sinkPartitionerRoundRobin()

What do you think?

Thanks for driving this,
Timo


On 22.07.20 17:26, Jark Wu wrote:

Hi all,

After some offline discussion with other people, I'm also fine with using
the builder pattern now,
  even though I still think the `.build()` method is a little verbose in the
user code.

I have updated the FLIP with following changes:

1) use builder pattern instead of "new" keyword. In order to avoid
duplicate code and reduce development burden for connector developers,
  I introduced abstract classes `TableDescriptorBuilder` and
`FormatDescriptorBuilder`.
 All the common methods are pre-defined in the base builder class, all
the custom descriptor builder should extend from the base builder classes.
 And we can add more methods into the base builder class in the future
without changes in the connectors.
2) use Expression instead of SQL expression string for computed column and
watermark strategy
3) use `watermark(rowtime, expr)` as the watermark method.
4) `Schema.column()` accepts `AbstractDataType` instead of `DataType`
5) drop Schema#proctime and Schema#watermarkFor#boundedOutOfOrderTimestamps

A full example will look like this:

tEnv.createTemporaryTable(
 "MyTable",
 KafkaConnector.newBuilder()
 .version("0.11")
 .topic("user_logs")
 .property("bootstrap.servers", "localhost:9092")
 .property("group.id", "test-group")
 .startFromEarliest()
 .sinkPartitionerRoundRobin()
 .format(JsonFormat.newBuilder().ignoreParseErrors(false).build())
 .schema(
 Schema.newBuilder()
 .column("user_id", DataTypes.BIGINT())
 .column("user_name", DataTypes.STRING())
 .column("score", DataTypes.DECIMAL(10, 2))
 .column("log_ts", DataTypes.STRING())
 .column("part_field_0", DataTypes.STRING())
 .column("part_field_1", DataTypes.INT())
 .column("proc", proctime()) // define a processing-time
attribute with column name "proc"
 .column("ts", toTimestamp($("log_ts")))
 .watermark("ts", $("ts").minus(lit(3).seconds()))
 .primaryKey("user_id")
 .build())
 .partitionedBy("part_field_0", "part_field_1")  // Kafka doesn't
support partitioned table yet, this is just an example for the API
 .build()
);

I hope this resolves all your concerns. Welcome for further feedback!

Updated FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API#FLIP129:RefactorDescriptorAPItoregisterconnectorsinTableAPI-FormatDescriptor

POC:
https://github.com/wuchong/flink/tree/descriptor-POC/flink-table/flink-table-common/src/main/java/org/apache/flink/table/descriptor3

Best,
Jark

On Thu, 16 Jul 2020 at 20:18, Jark Wu  wrote:


Thank you all for the discussion!

Here are my comments:

2) I agree we should support Expression as a computed column. But I'm in
favor of Leonard's point that maybe we can also support SQL string
expression as a computed column.
Because it also keeps aligned with DDL. The concern for Expression is that
converting Expression to SQL string, or (de)serializing Expression is
another topic not clear and may involve lots of work.
Maybe we can support Expression later if time permits.

6,7) I still prefer the "new" keyword over builder. I don't think
immutable is a strong reason. I care more about usability and experience
from users and devs perspective.
   - Users need to type more words if using builder:
`KafkaConnector.newBuilder()...build()`  vs `new KafkaConnector()...`
   - It's more difficult for developers to write a descriptor.  2 classes
(KafkaConnector and KafkaConnectorBuilder) + 5 more methods (builders,
schema, partitionedBy, like, etc..).
 With the "new" keyword all the common methods are defined by the
framework.
   - It's hard to have the same API style for different connectors, because
the common methods are defined by users. For example, some may have
`withSchema`, `partitionKey`, `withLike`, etc...

8) I'm -1 to `ConfigOption`. The ConfigOption is not used on `JsonFormat`,
but the generic `Connector#option`. This doesn't work when using format
options.

new Connector("kafka")
  .option(JsonOptions.IGNORE_PARSE_ERRORS, true);   // this is wrong,
because "kafka" requires "json.ignore-parse-errors" as the 

答复: Flink Redis connectivity

2020-07-23 Thread 范超
Hi Ramya

I just tried to code the example which is worked in 1.10 which I using a custom 
RichFlatMapFunction to connect ,transform data  and release the conn in its 
override method.

// app.java
public class RedisMapDemo {
public static void main(String[] args) throws Exception {
// 1. source
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment();
env.setParallelism(1);

DataStream sourceStream = env.fromElements("test_value");

// 2. custom map function
DataStream redisUpdatedStream = sourceStream.flatMap(new 
RedisFlatMap());

redisUpdatedStream.print();
env.execute("testing redis flatmap");
}
}

// this should be saved as another java file  (RedisFlatMap.java)
public class RedisFlatMap extends RichFlatMapFunction {
String TEST_REDIS_KEY = "my_first_lettuce_key";
RedisClient redisClient;
StatefulRedisConnection connection;
RedisCommands syncCommands;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
redisClient = RedisClient.create("redis://localhost:6379/0");
connection = redisClient.connect();
syncCommands = connection.sync();
}

@Override
public void close() throws Exception {
super.close();
// maybe release conn here ?
connection.close();
redisClient.shutdown();
}

@Override
public void flatMap(String inputString, Collector out)
throws Exception {
// 1. write to redis
// syncCommands.set(TEST_REDIS_KEY,  " Hello, Redis!");

// 2. read from redis
String tmpValue = syncCommands.get(TEST_REDIS_KEY);

// 3. transform
out.collect(inputString + " - " + tmpValue);
}
}
-邮件原件-
发件人: Ramya Ramamurthy [mailto:hair...@gmail.com] 
发送时间: 2020年7月21日 星期二 18:42
收件人: dev@flink.apache.org
主题: Flink Redis connectivity

Hi,

As per the understanding we have from the documentation, I guess its not 
possible to take the redis connection within the Data Stream. In that case, how 
should i proceed ? How can i access a DB client object within the stream ??

I am using Flink 1.7. any help here would be appreciated. Thanks.

  RedisClient redisClient = new
RedisClient(RedisURI.create("redis://localhost:6379"));
RedisConnection client = redisClient.connect(); 
DataStream parsedStream = ipdetailsStream.map((MapFunction) value -> {

String ct = value.getField(5).toString();

String res = "";
if (ct.equals("14") || ct.equals("4")) {

res = client.set("key", "val");
}
return res;
});

Thanks,


Re: Flink Redis connectivity

2020-07-23 Thread Yangze Guo
Do you need to read the state you maintained from Redis? The
flink-connector-redis only contains sink operator.

Best,
Yangze Guo

On Thu, Jul 23, 2020 at 3:28 PM Ramya Ramamurthy  wrote:
>
> Hi,
>
> Thanks for your response.
>
> I am trying to maintain some state in redis, and for each incoming packet,
> I try to map the information on redis, and then finally use ES as a sink to
> push the data.
> But with this flink-connector-redis, I am not sure if the same can be
> achieved. Can you please elaborate on this , so it would be very helpful.
>
> Thank you.
>
>
> On Wed, Jul 22, 2020 at 9:29 AM Yun Tang  wrote:
>
> > Hi Ramya
> >
> > Have you ever tried flink-connector-redis<
> > https://github.com/apache/bahir-flink/tree/master/flink-connector-redis>
> > in bahir [1][2]? I think you could use it or obtain some insights.
> >
> > [1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/
> > [2] https://github.com/apache/bahir-flink
> >
> > Best
> > Yun Tang
> >
> > 
> > From: Yangze Guo 
> > Sent: Tuesday, July 21, 2020 18:50
> > To: dev 
> > Subject: Re: Flink Redis connectivity
> >
> > Hi,
> >
> > I think you could implement `RichMapFunction` and create `redisClient`
> > in the `open` method.
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy 
> > wrote:
> > >
> > > Hi,
> > >
> > > As per the understanding we have from the documentation, I guess its not
> > > possible to take the redis connection within the Data Stream. In that
> > case,
> > > how should i proceed ? How can i access a DB client object within the
> > > stream ??
> > >
> > > I am using Flink 1.7. any help here would be appreciated. Thanks.
> > >
> > >   RedisClient redisClient = new
> > > RedisClient(RedisURI.create("redis://localhost:6379"));
> > > RedisConnection client =
> > > redisClient.connect();
> > > DataStream parsedStream = ipdetailsStream.map((MapFunction > > String>) value -> {
> > >
> > > String ct = value.getField(5).toString();
> > >
> > > String res = "";
> > > if (ct.equals("14") || ct.equals("4")) {
> > >
> > > res = client.set("key", "val");
> > > }
> > > return res;
> > > });
> > >
> > > Thanks,
> >


[jira] [Created] (FLINK-18681) The jar package version conflict causes the task to continue to increase and grab resources

2020-07-23 Thread wangtaiyang (Jira)
wangtaiyang created FLINK-18681:
---

 Summary: The jar package version conflict causes the task to 
continue to increase and grab resources
 Key: FLINK-18681
 URL: https://issues.apache.org/jira/browse/FLINK-18681
 Project: Flink
  Issue Type: Bug
Affects Versions: 1.11.0
Reporter: wangtaiyang


When I submit a flink task to yarn, the default resource configuration is 
1G&1core, but in fact this task will always increase resources 2core, 3core, 
and so on. . . 200core. . . Then I went to look at the JM log and found the 
following error:
{code:java}
//代码占位符
java.lang.NoSuchMethodError: 
org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder;java.lang.NoSuchMethodError:
 
org.apache.commons.cli.Option.builder(Ljava/lang/String;)Lorg/apache/commons/cli/Option$Builder;
 at 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions.(CommandLineOptions.java:28)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
org.apache.flink.runtime.clusterframework.BootstrapTools.lambda$getDynamicPropertiesAsString$0(BootstrapTools.java:648)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267) 
~[?:1.8.0_191]
...
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptionsjava.lang.NoClassDefFoundError:
 Could not initialize class 
org.apache.flink.runtime.entrypoint.parser.CommandLineOptions at 
org.apache.flink.runtime.clusterframework.BootstrapTools.lambda$getDynamicPropertiesAsString$0(BootstrapTools.java:648)
 ~[flink-dist_2.11-1.11.1.jar:1.11.1] at 
java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267) 
~[?:1.8.0_191] at 
java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1553) 
~[?:1.8.0_191] at 
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) 
~[?:1.8.0_191]{code}
Finally, it is confirmed that it is caused by the commands-cli version 
conflict, but the task reporting error has not stopped and will continue to 
grab resources and increase. Is this a bug?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] Adding Azure Platform Support in DataStream, Table and SQL Connectors

2020-07-23 Thread Robert Metzger
Hi Israel,

thanks a lot for reaching out! I'm very excited about your efforts to bring
additional Azure support into Flink.
There are ~50 threads on the user@ mailing list mentioning Azure -- that's
good evidence that our users use Flink in Azure.

Regarding your questions:

Do I need to create a FLIP [2] in order to make these changes to bring the
> new capabilities or the individual JIRA issues are sufficient?


For the two DataLake FS tickets, I don't see the need for a FLIP.

I am thinking about targeting Flink versions 1.10 through 1.12
> For new connectors like this, how many versions can/should this be
> integrated into?


We generally don't add new features to old releases (unless there's a very
good reason to backport the feature). Therefore, the new integrations will
all go into the next major Flink release (in this case probably Flink 1.12
for the first tickets).

Are there any upcoming changes to supported Java and Scala versions that I
> need to be aware of?


No, I'm not aware of any upcoming changes. Java 8 and Java 11 are the two
versions we test against.

My goal is to add the first two features [FLINK-18562] and [FLINK-18568] to
> the existing file system capabilities [1] and then have the other
> connectors FLINK-1856[3-7] exist as standalone plugins.


I like the order in which you approach the tickets. I assigned you to the
first ticket and commented on the second one. I'm also willing to help
review your pull requests.

What do you mean by "standalone plugins" in the context of connectors?
Would you like to contribute these connectors to the main Flink codebase,
or maintain them outside Flink but list them in flink-packages.org?

Best,
Robert


On Wed, Jul 22, 2020 at 10:43 AM Israel Ekpo  wrote:

> I have opened the following issues to track new efforts to bring additional
> Azure Support in the following areas to the connectors ecosystem.
>
> My goal is to add the first two features [FLINK-18562] and [FLINK-18568] to
> the existing file system capabilities [1] and then have the other
> connectors FLINK-1856[3-7] exist as standalone plugins.
>
> As more users adopt the additional connectors, we could incrementally bring
> them into the core code base if necessary with sufficient support.
>
> I am new to the process so that I have a few questions:
>
> Do I need to create a FLIP [2] in order to make these changes to bring the
> new capabilities or the individual JIRA issues are sufficient?
>
> I am thinking about targeting Flink versions 1.10 through 1.12
> For new connectors like this, how many versions can/should this be
> integrated into?
>
> Are there any upcoming changes to supported Java and Scala versions that I
> need to be aware of?
>
> Any ideas or suggestions you have would be great.
>
> Below is a summary of the JIRA issues that were created to track the effort
>
> Add Support for Azure Data Lake Store Gen 2 in Flink File System
> https://issues.apache.org/jira/browse/FLINK-18562
>
> Add Support for Azure Data Lake Store Gen 2 in Streaming File Sink
> https://issues.apache.org/jira/browse/FLINK-18568
>
> Add Support for Azure Cosmos DB DataStream Connector
> https://issues.apache.org/jira/browse/FLINK-18563
>
> Add Support for Azure Event Hub DataStream Connector
> https://issues.apache.org/jira/browse/FLINK-18564
>
> Add Support for Azure Event Grid DataStream Connector
> https://issues.apache.org/jira/browse/FLINK-18565
>
> Add Support for Azure Cognitive Search DataStream Connector
> https://issues.apache.org/jira/browse/FLINK-18566
>
> Add Support for Azure Cognitive Search Table & SQL Connector
> https://issues.apache.org/jira/browse/FLINK-18567
>
>
> [1] https://github.com/apache/flink/tree/master/flink-filesystems
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
>


[jira] [Created] (FLINK-18680) Improve RecordsWithSplitIds API

2020-07-23 Thread Jiangjie Qin (Jira)
Jiangjie Qin created FLINK-18680:


 Summary: Improve RecordsWithSplitIds API
 Key: FLINK-18680
 URL: https://issues.apache.org/jira/browse/FLINK-18680
 Project: Flink
  Issue Type: Sub-task
  Components: Connectors / Common
Affects Versions: 1.11.1
Reporter: Jiangjie Qin


The current API requires some improvements in a few aspects:
 # The current API enforces the {{SplitReaders}} to expose the records via a 
java collection which is not performance friendly in some cases.
 # Some information does not change across the records, but the current 
RecordsWithSplitIds data structure does not support per-batch information. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: Flink Redis connectivity

2020-07-23 Thread Ramya Ramamurthy
Hi,

Thanks for your response.

I am trying to maintain some state in redis, and for each incoming packet,
I try to map the information on redis, and then finally use ES as a sink to
push the data.
But with this flink-connector-redis, I am not sure if the same can be
achieved. Can you please elaborate on this , so it would be very helpful.

Thank you.


On Wed, Jul 22, 2020 at 9:29 AM Yun Tang  wrote:

> Hi Ramya
>
> Have you ever tried flink-connector-redis<
> https://github.com/apache/bahir-flink/tree/master/flink-connector-redis>
> in bahir [1][2]? I think you could use it or obtain some insights.
>
> [1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/
> [2] https://github.com/apache/bahir-flink
>
> Best
> Yun Tang
>
> 
> From: Yangze Guo 
> Sent: Tuesday, July 21, 2020 18:50
> To: dev 
> Subject: Re: Flink Redis connectivity
>
> Hi,
>
> I think you could implement `RichMapFunction` and create `redisClient`
> in the `open` method.
>
> Best,
> Yangze Guo
>
> On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy 
> wrote:
> >
> > Hi,
> >
> > As per the understanding we have from the documentation, I guess its not
> > possible to take the redis connection within the Data Stream. In that
> case,
> > how should i proceed ? How can i access a DB client object within the
> > stream ??
> >
> > I am using Flink 1.7. any help here would be appreciated. Thanks.
> >
> >   RedisClient redisClient = new
> > RedisClient(RedisURI.create("redis://localhost:6379"));
> > RedisConnection client =
> > redisClient.connect();
> > DataStream parsedStream = ipdetailsStream.map((MapFunction > String>) value -> {
> >
> > String ct = value.getField(5).toString();
> >
> > String res = "";
> > if (ct.equals("14") || ct.equals("4")) {
> >
> > res = client.set("key", "val");
> > }
> > return res;
> > });
> >
> > Thanks,
>