本文共 26821 字,大约阅读时间需要 89 分钟。
接上篇博客,本篇文章介绍一下tableAPI的基本使用
package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.Tumble;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.functions.ScalarFunction;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator streamOperator = streamSource.map(new MapFunction () { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.milliseconds(1000 * 60)) { @Override public long extractTimestamp(Entity entity) { return entity.getTimestamp(); } }); Table select = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp").rowtime()); Table table = select.filter(and( $("name").isNotNull(), $("country").isNotNull(), $("timestamp").isNotNull() )) .select($("name").lowerCase().as("a"), $("country"), $("timestamp")) .window(Tumble.over(lit(10).seconds()).on($("timestamp")).as("hourlyWindow")) .groupBy($("a"), $("country"),$("hourlyWindow")) .select($("a"), $("country").count(),$("country").avg(),$("hourlyWindow").end().as("hour")); //5、转换成stream,打印在控制台上 tEnv.toRetractStream(table,Row.class).print(); env.execute("readFileCreateTableStream"); }}
package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.Tumble;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.functions.ScalarFunction;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator streamOperator = streamSource.map(new MapFunction () { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.milliseconds(1000 * 60)) { @Override public long extractTimestamp(Entity entity) { return entity.getTimestamp(); } }); Table table = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); Table table2 = table.where($("name").isEqual("gaojian")); Table table1 = table.filter($("country").mod(2).isEqual(0)); //5、转换成stream,打印在控制台上 tEnv.toRetractStream(table1,Row.class).print(); tEnv.toRetractStream(table1,Entity.class).print(); env.execute("readFileCreateTableStream"); }}
执行字段添加操作。如果添加的字段已经存在,它将引发异常。
package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator streamOperator = streamSource.map(new MapFunction () { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.milliseconds(1000 * 60)) { @Override public long extractTimestamp(Entity entity) { return entity.getTimestamp(); } }); Table orders = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); orders.printSchema(); //打印原型表结构 System.out.println("======================================"); Table result = orders.addColumns(concat($("name"), "sunny").as("city")); result.printSchema(); //打印添加后表结构 DataStream > dataStream = tEnv.toRetractStream(result, Row.class); dataStream.print("add"); env.execute("readFileCreateTableStream"); }}
以上程序执行结果:
root |-- name: STRING |-- country: INT |-- timestamp: BIGINT======================================root |-- name: STRING |-- country: INT |-- timestamp: BIGINT |-- city: STRINGadd> (true,GAOJIAN,14,1598512338687,GAOJIANsunny)add> (true,gaojian,14,1598512338687,gaojiansunny)add> (true,aaaaaaaa,13,1598512338687,aaaaaaaasunny)
执行字段添加操作。如果添加列名称与现有列名称相同,则现有字段将被替换。此外,如果添加的字段具有重复的字段名称,则使用最后一个。
package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator streamOperator = streamSource.map(new MapFunction () { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.milliseconds(1000 * 60)) { @Override public long extractTimestamp(Entity entity) { return entity.getTimestamp(); } }); Table orders = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); orders.printSchema(); //打印原型表结构 System.out.println("======================================"); Table table = orders.addOrReplaceColumns(concat($("name"), "ADD").as("name")); table.printSchema(); //打印添加列后表结构 DataStream > dataStream1 = tEnv.toRetractStream(table, Row.class); dataStream1.print("addOrReplace"); env.execute("readFileCreateTableStream"); }}
以上程序执行结果
root |-- name: STRING |-- country: INT |-- timestamp: BIGINT======================================root |-- name: STRING |-- country: INT |-- timestamp: BIGINTaddOrReplace> (true,GAOJIANADD,14,1598512338687)addOrReplace> (true,gaojianADD,14,1598512338687)addOrReplace> (true,aaaaaaaaADD,13,1598512338687)
执行字段删除操作。字段表达式应该是字段引用表达式,并且只能删除现有字段。
package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator streamOperator = streamSource.map(new MapFunction () { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.milliseconds(1000 * 60)) { @Override public long extractTimestamp(Entity entity) { return entity.getTimestamp(); } }); Table orders = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); orders.printSchema(); //打印原型表结构 System.out.println("======================================"); Table table = orders.dropColumns($("name")); table.printSchema(); //打印删除name字段后的表结构 tEnv.toRetractStream(table,Row.class).print("drop"); env.execute("readFileCreateTableStream"); }}
以上程序执行结果:
root |-- name: STRING |-- country: INT |-- timestamp: BIGINT======================================root |-- country: INT |-- timestamp: BIGINTdrop> (true,14,1598512338687)drop> (true,14,1598512338687)drop> (true,13,1598512338687)
执行字段重命名操作。字段表达式应该是别名表达式,并且只有现有字段可以重命名。
package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); //3、数据源映射成pojo SingleOutputStreamOperator streamOperator = streamSource.map(new MapFunction () { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.milliseconds(1000 * 60)) { @Override public long extractTimestamp(Entity entity) { return entity.getTimestamp(); } }); Table orders = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); orders.printSchema(); //打印原型表结构 System.out.println("======================================"); Table renameColumns = orders.renameColumns($("name").as("myName"), $("country").as("my_country")); renameColumns.printSchema(); tEnv.toRetractStream(renameColumns,Row.class).print(); env.execute("readFileCreateTableStream"); }}
以上程序执行结果:
root |-- name: STRING |-- country: INT |-- timestamp: BIGINT======================================root |-- myName: STRING |-- my_country: INT |-- timestamp: BIGINT(true,GAOJIAN,14,1598512338687)(true,gaojian,14,1598512338687)(true,aaaaaaaa,13,1598512338687)
对于流式查询,根据聚合的类型和不同的分组键的数量,计算查询结果所需的状态可能会无限增长。请提供具有有效保留间隔的查询配置,以防止出现过多的状态。
Table select = orders.groupBy($("name")).select($("name"), $("country").count());
在Group Window和可能的一个或多个分组键上对表进行分组和聚合
Table table = orders .window(Tumble.over(lit(5).seconds()).on($("ts")).as("w")) .groupBy($("name"), $("w")) .select( $("name"), $("w").start(), $("w").end(), $("w").rowtime(), $("country").sum().as("d") );
**注意:**必须在同一窗口中定义所有聚合,即,相同的分区,排序和范围。当前,仅支持具有PRECEDING(未绑定和有界)到CURRENT ROW范围的窗口。尚不支持带有FOLLOWING的范围。必须在单个上指定ORDER BY 。
Table table = orders .window( Over .partitionBy($("name")) .orderBy($("timestamp")) //该字段必须是时间字段 .preceding(UNBOUNDED_RANGE) //未绑定——有界 .following(CURRENT_RANGE) //不支持带有FOLLOWING的范围 .as("w")) .select( $("name"), $("country").avg().over($("w")), $("country").max().over($("w")), $("country").min().over($("w")) );
1
Table table = orders .groupBy($("name")) .select($("name"), $("country").sum().distinct().as("d"));
2
Table table = orders .window(Tumble.over(lit(5).seconds()) .on($("timestamp")) .as("w")) .groupBy($("name"), $("w")) .select($("name"), $("country").sum().distinct().as("d"));
3
**注意:**对于流查询,计算查询结果所需的状态可能会无限增长,具体取决于不同字段的数量。请提供具有有效保留间隔的查询配置,以防止出现过多的状态。如果启用了状态清除功能,那么distinct必须发出消息以防止下游运算符过早地退出状态,这会使得distinct包含结果更新。
Table orders = tableEnv.from("Orders");Table result = orders.distinct();
package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.table.api.Over;import org.apache.flink.table.api.Table;import org.apache.flink.table.api.Tumble;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import org.apache.flink.table.expressions.DistinctAgg;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime { public static void main(String[] args) throws Exception { //1、流式环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); //2、读取文件数据源 DataStreamSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); DataStreamSource streamSource01 = env.readTextFile("D:\\test\\b.txt"); //3、数据源映射成pojo SingleOutputStreamOperator streamOperator = streamSource.map(new MapFunction () { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor (Time.milliseconds(1000 * 60)) { @Override public long extractTimestamp(Entity entity) { return entity.getTimestamp(); } }); SingleOutputStreamOperator streamOperator01 = streamSource01.map(new MapFunction () { @Override public POJO map(String s) throws Exception { String[] split = s.split(","); return new POJO(split[0], split[1]); } }); //将Stream转换为table Table left = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); Table right = tEnv.fromDataStream(streamOperator01,$("rname"),$("sex")); //table连接 Table table = left.join(right) .where($("name").isEqual($("rname"))) .select($("name"), $("country"), $("sex")); tEnv.toRetractStream(table,Row.class).print(); env.execute("readFileCreateTableStream"); }}
Table left = tEnv.fromDataStream(streamOperator,$("name"),$("country"),$("timestamp")); Table right = tEnv.fromDataStream(streamOperator01,$("rname"),$("sex")); Table leftJoin = left.leftOuterJoin(right, $("name").isEqual($("rname"))) .select($("name"), $("country"), $("sex")); Table rightJoin = left.rightOuterJoin(right, $("name").isEqual($("rname"))) .select($("name"), $("country"), $("sex")); Table fullJoin = left.fullOuterJoin(right, $("name").isEqual($("rname"))) .select($("name"), $("country"), $("sex")); tEnv.toRetractStream(leftJoin,Row.class).print("left"); tEnv.toRetractStream(rightJoin,Row.class).print("right"); tEnv.toRetractStream(fullJoin,Row.class).print("full");
类似于SQL UNION子句。合并两个已删除重复记录的表。两个表必须具有相同的字段类型。
package com.flink.sql.environment.tableAPI;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.java.ExecutionEnvironment;import org.apache.flink.api.java.operators.DataSource;import org.apache.flink.api.java.operators.MapOperator;import org.apache.flink.table.api.*;import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;import org.apache.flink.types.Row;import static org.apache.flink.table.api.Expressions.*;public class ProcessTime { public static void main(String[] args) throws Exception { //1、流式环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); BatchTableEnvironment tEnv = BatchTableEnvironment.create(env); //2、读取文件数据源 DataSourcestreamSource = env.readTextFile("D:\\test\\a.txt"); DataSource streamSource01 = env.readTextFile("D:\\test\\b.txt"); //3、数据源映射成pojo MapOperator mapOperator = streamSource.map(new MapFunction () { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }); MapOperator mapOperator1 = streamSource01.map(new MapFunction () { @Override public Entity map(String s) throws Exception { String[] split = s.split(","); return new Entity(split[0], Integer.parseInt(split[1]), Long.parseLong(split[2])); } }); //4、将DataSet转换为table Table left = tEnv.fromDataSet(mapOperator,$("name"),$("country"),$("timestamp")); Table right = tEnv.fromDataSet(mapOperator1,$("name"),$("country"),$("timestamp")); //5、两个table进行union Table result = left.union(right).select($("name")); //转换,输出打印在控制台 tEnv.toDataSet(result,Row.class).print(); env.execute(); }}
类似于SQL UNION ALL子句。合并两个表。两个表必须具有相同的字段类型。
//4、将DataSet转换为table Table left = tEnv.fromDataSet(mapOperator,$("name"),$("country"),$("timestamp")); Table right = tEnv.fromDataSet(mapOperator1,$("name"),$("country"),$("timestamp")); //5、两个table进行union Table result = left.unionAll(right);
类似于SQL INTERSECT子句。相交返回两个表中都存在的记录。如果一个记录在一个或两个表中存在一次以上,则仅返回一次,即结果表中没有重复的记录。两个表必须具有相同的字段类型。
//4、将DataSet转换为table Table left = tEnv.fromDataSet(mapOperator,$("name"),$("country"),$("timestamp")); Table right = tEnv.fromDataSet(mapOperator1,$("name"),$("country"),$("timestamp")); //5、两个table进行union Table result = left.intersect(right);
类似于SQL INTERSECT ALL子句。IntersectAll返回两个表中都存在的记录。如果一个记录在两个表中都多次出现,则返回的次数与在两个表中都多次出现一样,即结果表可能有重复的记录。两个表必须具有相同的字段类型。
//5、两个table进行union Table result = left.intersectAll(right);
类似于SQL EXCEPT子句。减号从左表返回不存在于右表中的记录。左表中的重复记录仅返回一次,即删除了重复项。两个表必须具有相同的字段类型。
测试数据:
left table:gaojian,13,400000shenyang,14,400000shenyang,15,400000shenyang,15,400000shenadyang,15,400000right table:gaojian,13,400000shenyang,14,400000shenyang,15,400000
Table result = left.minus(right);
运行结果:
shenadyang,15,400000
类似于SQL EXCEPT ALL子句。MinusAll返回右表中不存在的记录。将返回(n-m)次在左表中出现n次且在右表中出现m次的记录,即,删除与右表中存在的重复项一样多的记录。两个表必须具有相同的字段类型。
Table result = left.minusAll(right);
运行结果:
shenyang,15,400000
shenadyang,15,400000类似于SQL IN子句。如果给定的表子查询中存在表达式,则In返回true。子查询表必须由一列组成。该列必须与表达式具有相同的数据类型
//4、将DataSet转换为table Table left = tEnv.fromDataSet(mapOperator,$("name"),$("country"),$("timestamp")); Table right = tEnv.fromDataSet(mapOperator1,$("name")); Table result = left .select($("name"), $("country"), $("timestamp")) .where($("name").in(right));
类似于SQL ORDER BY子句。返回在所有并行分区上全局排序的记录。
Table result = left.orderBy($("timestamp").asc()); //升序Table result = left.orderBy($("timestamp").desc()); //降序
偏移量和访存限制了从排序结果返回的记录数。偏移和提取在技术上是Order By运算符的一部分,因此必须在其之前。
Table result = left.orderBy($("timestamp").asc()).fetch(3); //返回排序结果的前3条记录
Table result = left.orderBy($("timestamp").asc()).offset(2); //跳过前2条记录,并从排序结果中返回所有随后的记录
Table result = left.orderBy($("timestamp").asc()).offset(2).fetch(1); //跳过前2条记录,并从排序结果中返回后1条记录
与SQL查询中的“ INSERT INTO”子句类似,该方法在已注册的输出表中执行插入操作。executeInsert()
方法将立即提交执行插入操作的Flink作业。
Table orders = tableEnv.from("Orders");orders.executeInsert("OutOrders");
转载地址:http://vimzi.baihongyu.com/