博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink table & sql(二)tableAPI
阅读量:3960 次
发布时间:2019-05-24

本文共 26821 字,大约阅读时间需要 89 分钟。

接上篇博客,本篇文章介绍一下tableAPI的基本使用

8、table api

1、Scan, Projection, and Filter

(1)filter,isNotNull(),and,lowerCase(),as,count(),avg(),end,start
package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.Tumble;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.functions.ScalarFunction;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator
streamOperator = streamSource.map(new MapFunction
() {
@Override public Entity map(String s) throws Exception {
String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor
(Time.milliseconds(1000 * 60)) {
@Override public long extractTimestamp(Entity entity) {
return entity.getTimestamp(); } }); Table select = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp").rowtime()); Table table = select.filter(and( $("name").isNotNull(), $("country").isNotNull(), $("timestamp").isNotNull() )) .select($("name").lowerCase().as("a"), $("country"), $("timestamp")) .window(Tumble.over(lit(10).seconds()).on($("timestamp")).as("hourlyWindow")) .groupBy($("a"), $("country"),$("hourlyWindow")) .select($("a"), $("country").count(),$("country").avg(),$("hourlyWindow").end().as("hour")); //5、转换成stream,打印在控制台上 tEnv.toRetractStream(table,Row.class).print(); env.execute("readFileCreateTableStream"); }}
(2)where,isEqual,mod
package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.Tumble;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.functions.ScalarFunction;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator
streamOperator = streamSource.map(new MapFunction
() {
@Override public Entity map(String s) throws Exception {
String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor
(Time.milliseconds(1000 * 60)) {
@Override public long extractTimestamp(Entity entity) {
return entity.getTimestamp(); } }); Table table = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); Table table2 = table.where($("name").isEqual("gaojian")); Table table1 = table.filter($("country").mod(2).isEqual(0)); //5、转换成stream,打印在控制台上 tEnv.toRetractStream(table1,Row.class).print(); tEnv.toRetractStream(table1,Entity.class).print(); env.execute("readFileCreateTableStream"); }}

2、列操作

(1) AddColumns

执行字段添加操作。如果添加的字段已经存在,它将引发异常。

package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator
streamOperator = streamSource.map(new MapFunction
() {
@Override public Entity map(String s) throws Exception {
String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor
(Time.milliseconds(1000 * 60)) {
@Override public long extractTimestamp(Entity entity) {
return entity.getTimestamp(); } }); Table orders = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); orders.printSchema(); //打印原型表结构 System.out.println("======================================"); Table result = orders.addColumns(concat($("name"), "sunny").as("city")); result.printSchema(); //打印添加后表结构 DataStream
> dataStream = tEnv.toRetractStream(result, Row.class); dataStream.print("add"); env.execute("readFileCreateTableStream"); }}

以上程序执行结果:

root |-- name: STRING |-- country: INT |-- timestamp: BIGINT======================================root |-- name: STRING |-- country: INT |-- timestamp: BIGINT |-- city: STRINGadd> (true,GAOJIAN,14,1598512338687,GAOJIANsunny)add> (true,gaojian,14,1598512338687,gaojiansunny)add> (true,aaaaaaaa,13,1598512338687,aaaaaaaasunny)
(2) AddOrReplaceColumns

执行字段添加操作。如果添加列名称与现有列名称相同,则现有字段将被替换。此外,如果添加的字段具有重复的字段名称,则使用最后一个。

package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator
streamOperator = streamSource.map(new MapFunction
() {
@Override public Entity map(String s) throws Exception {
String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor
(Time.milliseconds(1000 * 60)) {
@Override public long extractTimestamp(Entity entity) {
return entity.getTimestamp(); } }); Table orders = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); orders.printSchema(); //打印原型表结构 System.out.println("======================================"); Table table = orders.addOrReplaceColumns(concat($("name"), "ADD").as("name")); table.printSchema(); //打印添加列后表结构 DataStream
> dataStream1 = tEnv.toRetractStream(table, Row.class); dataStream1.print("addOrReplace"); env.execute("readFileCreateTableStream"); }}

以上程序执行结果

root |-- name: STRING |-- country: INT |-- timestamp: BIGINT======================================root |-- name: STRING |-- country: INT |-- timestamp: BIGINTaddOrReplace> (true,GAOJIANADD,14,1598512338687)addOrReplace> (true,gaojianADD,14,1598512338687)addOrReplace> (true,aaaaaaaaADD,13,1598512338687)
(3) DropColumns

执行字段删除操作。字段表达式应该是字段引用表达式,并且只能删除现有字段。

package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator
streamOperator = streamSource.map(new MapFunction
() {
@Override public Entity map(String s) throws Exception {
String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor
(Time.milliseconds(1000 * 60)) {
@Override public long extractTimestamp(Entity entity) {
return entity.getTimestamp(); } }); Table orders = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); orders.printSchema(); //打印原型表结构 System.out.println("======================================"); Table table = orders.dropColumns($("name")); table.printSchema(); //打印删除name字段后的表结构 tEnv.toRetractStream(table,Row.class).print("drop"); env.execute("readFileCreateTableStream"); }}

以上程序执行结果:

root |-- name: STRING |-- country: INT |-- timestamp: BIGINT======================================root |-- country: INT |-- timestamp: BIGINTdrop> (true,14,1598512338687)drop> (true,14,1598512338687)drop> (true,13,1598512338687)
(4) RenameColumns

执行字段重命名操作。字段表达式应该是别名表达式,并且只有现有字段可以重命名。

package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator
streamOperator = streamSource.map(new MapFunction
() {
@Override public Entity map(String s) throws Exception {
String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor
(Time.milliseconds(1000 * 60)) {
@Override public long extractTimestamp(Entity entity) {
return entity.getTimestamp(); } }); Table orders = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); orders.printSchema(); //打印原型表结构 System.out.println("======================================"); Table renameColumns = orders.renameColumns($("name").as("myName"), $("country").as("my_country")); renameColumns.printSchema(); tEnv.toRetractStream(renameColumns,Row.class).print(); env.execute("readFileCreateTableStream"); }}

以上程序执行结果:

root |-- name: STRING |-- country: INT |-- timestamp: BIGINT======================================root |-- myName: STRING |-- my_country: INT |-- timestamp: BIGINT(true,GAOJIAN,14,1598512338687)(true,gaojian,14,1598512338687)(true,aaaaaaaa,13,1598512338687)

3 Aggregation

(1) GroupBy

对于流式查询,根据聚合的类型和不同的分组键的数量,计算查询结果所需的状态可能会无限增长。请提供具有有效保留间隔的查询配置,以防止出现过多的状态。

Table select = orders.groupBy($("name")).select($("name"), $("country").count());
(2) GroupBy Window

在Group Window和可能的一个或多个分组键上对表进行分组和聚合

Table table = orders                .window(Tumble.over(lit(5).seconds()).on($("ts")).as("w"))                .groupBy($("name"), $("w"))                .select(                        $("name"),                        $("w").start(),                        $("w").end(),                        $("w").rowtime(),                        $("country").sum().as("d")                );
(3) Over Window

**注意:**必须在同一窗口中定义所有聚合,即,相同的分区,排序和范围。当前,仅支持具有PRECEDING(未绑定和有界)到CURRENT ROW范围的窗口。尚不支持带有FOLLOWING的范围。必须在单个上指定ORDER BY 。

Table table = orders                .window(                        Over                                .partitionBy($("name"))                                .orderBy($("timestamp"))   //该字段必须是时间字段                                .preceding(UNBOUNDED_RANGE)   //未绑定——有界                                .following(CURRENT_RANGE)  //不支持带有FOLLOWING的范围                                .as("w"))                .select(                        $("name"),                        $("country").avg().over($("w")),                        $("country").max().over($("w")),                        $("country").min().over($("w"))                );
(4) Distinct Aggregation

1

Table table = orders        .groupBy($("name"))        .select($("name"), $("country").sum().distinct().as("d"));

2

Table table = orders                .window(Tumble.over(lit(5).seconds())                .on($("timestamp"))                .as("w"))                .groupBy($("name"), $("w"))                .select($("name"), $("country").sum().distinct().as("d"));

3

 
(5) Distinct

**注意:**对于流查询,计算查询结果所需的状态可能会无限增长,具体取决于不同字段的数量。请提供具有有效保留间隔的查询配置,以防止出现过多的状态。如果启用了状态清除功能,那么distinct必须发出消息以防止下游运算符过早地退出状态,这会使得distinct包含结果更新。

Table orders = tableEnv.from("Orders");Table result = orders.distinct();

4 Joins

(1) Inner Join
package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Over;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.Tumble;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.expressions.DistinctAgg;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime {
public static void main(String[] args) throws Exception {
//1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSource
streamSource = env.readTextFile("D:\\test\\a.txt"); DataStreamSource
streamSource01 = env.readTextFile("D:\\test\\b.txt"); //3、数据源映射成pojo SingleOutputStreamOperator
streamOperator = streamSource.map(new MapFunction
() {
@Override public Entity map(String s) throws Exception {
String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor
(Time.milliseconds(1000 * 60)) {
@Override public long extractTimestamp(Entity entity) {
return entity.getTimestamp(); } }); SingleOutputStreamOperator
streamOperator01 = streamSource01.map(new MapFunction
() { @Override public POJO map(String s) throws Exception { String[] split = s.split(","); return new POJO(split[0], split[1]); } }); //将Stream转换为table Table left = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); Table right = tEnv.fromDataStream(streamOperator01,$("rname"),$("sex")); //table连接 Table table = left.join(right) .where($("name").isEqual($("rname"))) .select($("name"), $("country"), $("sex")); tEnv.toRetractStream(table,Row.class).print(); env.execute("readFileCreateTableStream"); }}
(2)Outer Join
Table left = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp"));        Table right = tEnv.fromDataStream(streamOperator01,$("rname"),$("sex"));        Table leftJoin = left.leftOuterJoin(right, $("name").isEqual($("rname")))                .select($("name"), $("country"), $("sex"));        Table rightJoin = left.rightOuterJoin(right, $("name").isEqual($("rname")))                .select($("name"), $("country"), $("sex"));        Table fullJoin = left.fullOuterJoin(right, $("name").isEqual($("rname")))                .select($("name"), $("country"), $("sex"));        tEnv.toRetractStream(leftJoin,Row.class).print("left");        tEnv.toRetractStream(rightJoin,Row.class).print("right");        tEnv.toRetractStream(fullJoin,Row.class).print("full");

5、Set Operations (注意:都是dataSet)

(1)Union

类似于SQL UNION子句。合并两个已删除重复记录的表。两个表必须具有相同的字段类型。

package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.operators.MapOperator;import org.apache.flink.table.api.*;import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime {
public static void main(String[] args) throws Exception {
//1、流式环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); //2、读取文件数据源 DataSource
streamSource = env.readTextFile("D:\\test\\a.txt"); DataSource
streamSource01 = env.readTextFile("D:\\test\\b.txt"); //3、数据源映射成pojo MapOperator
mapOperator = streamSource.map(new MapFunction
() {
@Override public Entity map(String s) throws Exception {
String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }); MapOperator
mapOperator1 = streamSource01.map(new MapFunction
() { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }); //4、将DataSet转换为table Table left = tEnv.fromDataSet(mapOperator,$("name"),$("country"),$("timestamp")); Table right = tEnv.fromDataSet(mapOperator1,$("name"),$("country"),$("timestamp")); //5、两个table进行union Table result = left.union(right).select($("name")); //转换,输出打印在控制台 tEnv.toDataSet(result,Row.class).print(); env.execute(); }}
(2) unionAll

类似于SQL UNION ALL子句。合并两个表。两个表必须具有相同的字段类型。

//4、将DataSet转换为table        Table left = tEnv.fromDataSet(mapOperator,$("name"),$("country"),$("timestamp"));        Table right = tEnv.fromDataSet(mapOperator1,$("name"),$("country"),$("timestamp"));        //5、两个table进行union        Table result = left.unionAll(right);
(3) Intersect

类似于SQL INTERSECT子句。相交返回两个表中都存在的记录。如果一个记录在一个或两个表中存在一次以上,则仅返回一次,即结果表中没有重复的记录。两个表必须具有相同的字段类型。

//4、将DataSet转换为table        Table left = tEnv.fromDataSet(mapOperator,$("name"),$("country"),$("timestamp"));        Table right = tEnv.fromDataSet(mapOperator1,$("name"),$("country"),$("timestamp"));        //5、两个table进行union        Table result = left.intersect(right);
(4) IntersectAll

类似于SQL INTERSECT ALL子句。IntersectAll返回两个表中都存在的记录。如果一个记录在两个表中都多次出现,则返回的次数与在两个表中都多次出现一样,即结果表可能有重复的记录。两个表必须具有相同的字段类型。

//5、两个table进行union        Table result = left.intersectAll(right);
(5)Minus

类似于SQL EXCEPT子句。减号从左表返回不存在于右表中的记录。左表中的重复记录仅返回一次,即删除了重复项。两个表必须具有相同的字段类型。

测试数据:

left table:gaojian,13,400000shenyang,14,400000shenyang,15,400000shenyang,15,400000shenadyang,15,400000right table:gaojian,13,400000shenyang,14,400000shenyang,15,400000
Table result = left.minus(right);

运行结果:

shenadyang,15,400000

(6)MinusAll

类似于SQL EXCEPT ALL子句。MinusAll返回右表中不存在的记录。将返回(n-m)次在左表中出现n次且在右表中出现m次的记录,即,删除与右表中存在的重复项一样多的记录。两个表必须具有相同的字段类型。

Table result = left.minusAll(right);

运行结果:

shenyang,15,400000

shenadyang,15,400000

(7)in (batch and stream)

类似于SQL IN子句。如果给定的表子查询中存在表达式,则In返回true。子查询表必须由一列组成。该列必须与表达式具有相同的数据类型

//4、将DataSet转换为table     Table left = tEnv.fromDataSet(mapOperator,$("name"),$("country"),$("timestamp"));     Table right = tEnv.fromDataSet(mapOperator1,$("name"));      Table result = left                .select($("name"), $("country"), $("timestamp"))                .where($("name").in(right));

6、OrderBy, Offset & Fetch

(1)OrderBy | batch

类似于SQL ORDER BY子句。返回在所有并行分区上全局排序的记录。

Table result = left.orderBy($("timestamp").asc());   //升序Table result = left.orderBy($("timestamp").desc());  //降序
(2)Offset & Fetch | batch

偏移量和访存限制了从排序结果返回的记录数。偏移和提取在技术上是Order By运算符的一部分,因此必须在其之前。

Table result = left.orderBy($("timestamp").asc()).fetch(3);  //返回排序结果的前3条记录
Table result = left.orderBy($("timestamp").asc()).offset(2);  //跳过前2条记录,并从排序结果中返回所有随后的记录
Table result = left.orderBy($("timestamp").asc()).offset(2).fetch(1);   //跳过前2条记录,并从排序结果中返回后1条记录

7、insert

与SQL查询中的“ INSERT INTO”子句类似,该方法在已注册的输出表中执行插入操作。executeInsert()方法将立即提交执行插入操作的Flink作业。

Table orders = tableEnv.from("Orders");orders.executeInsert("OutOrders");

转载地址:http://vimzi.baihongyu.com/

你可能感兴趣的文章
协作图(Collaboration Diagram)—UML图(七)
查看>>
什么是RUP
查看>>
什么是UML(UML总结)
查看>>
UML基础与应用系列文章汇总
查看>>
C#方法重载(overload)方法重写(override)隐藏(new)
查看>>
javascript实现滚动图片
查看>>
css+div练手-工作室
查看>>
CSS+DIV布局之道
查看>>
CSS+DIV练手-公司
查看>>
CSS+DIV练手—鲜花展
查看>>
深入浅出JavaScript(1)—ECMAScript
查看>>
深入浅出JavaScript(2)—ECMAScript
查看>>
Asp.Net+Jquery.Ajax详解1-开篇
查看>>
我的软件工程之路(四)—半年总结
查看>>
Asp.Net+Jquery.Ajax详解5-$.getScript
查看>>
Asp.Net+Jquery.Ajax详解6-$.ajaxSetup
查看>>
Asp.Net+Jquery.Ajax详解7-全局Ajax事件
查看>>
J2EE总结(宏观把握)
查看>>
什么是Dojo?与Jquery宏观对比,结果如何?
查看>>
Asp.Net+Jquery.Ajax详解8-核心$.ajax
查看>>