SQL查询工具
workspace: 这里使用虚拟清单(virtual manifest)方式
虚拟清单
若一个 Cargo.toml 有 [workspace] 但是没有 [package] 部分,则它是虚拟清单类型的工作空间。
对于没有主 package 的场景或你希望将所有的 package 组织在单独的目录中时,这种方式就非常适合。
workspace关键点
- 所有的 package 共享同一个 Cargo.lock 文件,该文件位于工作空间的根目录中
- 所有的 package 共享同一个输出目录,该目录默认的名称是 target ,位于工作空间根目录下
- 只有工作空间根目录的 Cargo.toml 才能包含 [patch], [replace] 和 [profile.*],而成员的 Cargo.toml 中的相应部分将被自动忽略
workspace使用方式
cargo run -p <member package>
cargo build -p queryer
使用说明
-
在工作空间中,package 相关的 Cargo 命令(例如 cargo build )可以使用 -p 、 –package 或 –workspace 命令行参数来指定想要操作的 package。
-
若没有指定任何参数,则 Cargo 将使用当前工作目录的中的 package 。若工作目录是虚拟清单类型的工作空间,则该命令将作用在所有成员上(就好像是使用了 –workspace 命令行参数)。而 default-members 可以在命令行参数没有被提供时,手动指定操作的成员
queryer package
cargo.toml
两个使用示例
- dialect.rs:SQL解析
use sqlparser::{dialect::GenericDialect, parser::Parser}; fn main() { tracing_subscriber::fmt::init(); let sql = "SELECT a a1, b, 123, myfunc(b), * \ FROM data_source \ WHERE a > b AND b < 100 AND c BETWEEN 10 AND 20 \ ORDER BY a DESC, b \ LIMIT 50 OFFSET 10"; let ast = Parser::parse_sql(&GenericDialect::default(), sql); println!("{:#?}", ast); }
- covid.rs: AST转换
use anyhow::Result; use queryer::query; #[tokio::main] async fn main() -> Result<()> { tracing_subscriber::fmt::init(); let url = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/latest/owid-covid-latest.csv"; // 使用 sql 从 URL 里获取数据 let sql = format!( "SELECT location name, total_cases, new_cases, total_deaths, new_deaths \ FROM {} where new_deaths >= 500 ORDER BY new_cases DESC", url ); let df1 = query(sql).await?; println!("{:?}", df1); Ok(()) }
src/convert.rs
结构体定义:sql与对应部分结构体, 注意限于孤儿原则的再包装
/// 解析出来的 SQL pub struct Sql<'a> { pub(crate) selection: Vec<Expr>, pub(crate) condition: Option<Expr>, pub(crate) source: &'a str, pub(crate) order_by: Vec<(String, bool)>, pub(crate) offset: Option<i64>, pub(crate) limit: Option<usize>, } // 因为 Rust trait 的孤儿规则,我们如果要想对已有的类型实现已有的 trait, // 需要简单包装一下 pub struct Expression(pub(crate) Box<SqlExpr>); pub struct Operation(pub(crate) SqlBinaryOperator); pub struct Projection<'a>(pub(crate) &'a SelectItem); pub struct Source<'a>(pub(crate) &'a [TableWithJoins]); pub struct Order<'a>(pub(crate) &'a OrderByExpr); pub struct Offset<'a>(pub(crate) &'a SqlOffset); pub struct Limit<'a>(pub(crate) &'a SqlExpr); pub struct Value(pub(crate) SqlValue);
sql的转换
/// 把 SqlParser 解析出来的 Statement 转换成我们需要的结构 impl<'a> TryFrom<&'a Statement> for Sql<'a> { type Error = anyhow::Error; fn try_from(sql: &'a Statement) -> Result<Self, Self::Error> { match sql { // 目前我们只关心 query (select ... from ... where ...) Statement::Query(q) => { let offset = q.offset.as_ref(); let limit = q.limit.as_ref(); let orders = &q.order_by; let Select { from: table_with_joins, selection: where_clause, projection, group_by: _, .. } = match &q.body { SetExpr::Select(statement) => statement.as_ref(), _ => return Err(anyhow!("We only support Select Query at the moment")), }; let source = Source(table_with_joins).try_into()?; let condition = match where_clause { Some(expr) => Some(Expression(Box::new(expr.to_owned())).try_into()?), None => None, }; let mut selection = Vec::with_capacity(8); for p in projection { let expr = Projection(p).try_into()?; selection.push(expr); } let mut order_by = Vec::new(); for expr in orders { order_by.push(Order(expr).try_into()?); } let offset = offset.map(|v| Offset(v).into()); let limit = limit.map(|v| Limit(v).into()); Ok(Sql { selection, condition, source, order_by, offset, limit, }) } _ => Err(anyhow!("We only support Query at the moment")), } } }
对应部分结构体的转换
/// 把 SqlParser 的 Expr 转换成 DataFrame 的 Expr impl TryFrom<Expression> for Expr { type Error = anyhow::Error; fn try_from(expr: Expression) -> Result<Self, Self::Error> { match *expr.0 { SqlExpr::BinaryOp { left, op, right } => Ok(Expr::BinaryExpr { left: Box::new(Expression(left).try_into()?), op: Operation(op).try_into()?, right: Box::new(Expression(right).try_into()?), }), SqlExpr::Wildcard => Ok(Self::Wildcard), SqlExpr::IsNull(expr) => Ok(Self::IsNull(Box::new(Expression(expr).try_into()?))), SqlExpr::IsNotNull(expr) => Ok(Self::IsNotNull(Box::new(Expression(expr).try_into()?))), SqlExpr::Identifier(id) => Ok(Self::Column(Arc::new(id.value))), SqlExpr::Value(v) => Ok(Self::Literal(Value(v).try_into()?)), v => Err(anyhow!("expr {:#?} is not supported", v)), } } } /// 把 SqlParser 的 BinaryOperator 转换成 DataFrame 的 Operator impl TryFrom<Operation> for Operator { type Error = anyhow::Error; fn try_from(op: Operation) -> Result<Self, Self::Error> { match op.0 { SqlBinaryOperator::Plus => Ok(Self::Plus), SqlBinaryOperator::Minus => Ok(Self::Minus), SqlBinaryOperator::Multiply => Ok(Self::Multiply), SqlBinaryOperator::Divide => Ok(Self::Divide), SqlBinaryOperator::Modulo => Ok(Self::Modulus), SqlBinaryOperator::Gt => Ok(Self::Gt), SqlBinaryOperator::Lt => Ok(Self::Lt), SqlBinaryOperator::GtEq => Ok(Self::GtEq), SqlBinaryOperator::LtEq => Ok(Self::LtEq), SqlBinaryOperator::Eq => Ok(Self::Eq), SqlBinaryOperator::NotEq => Ok(Self::NotEq), SqlBinaryOperator::And => Ok(Self::And), SqlBinaryOperator::Or => Ok(Self::Or), v => Err(anyhow!("Operator {} is not supported", v)), } } } /// 把 SqlParser 的 SelectItem 转换成 DataFrame 的 Expr impl<'a> TryFrom<Projection<'a>> for Expr { type Error = anyhow::Error; fn try_from(p: Projection<'a>) -> Result<Self, Self::Error> { match p.0 { SelectItem::UnnamedExpr(SqlExpr::Identifier(id)) => Ok(col(&id.to_string())), SelectItem::ExprWithAlias { expr: SqlExpr::Identifier(id), alias, } => Ok(Expr::Alias( Box::new(Expr::Column(Arc::new(id.to_string()))), Arc::new(alias.to_string()), )), SelectItem::QualifiedWildcard(v) => Ok(col(&v.to_string())), SelectItem::Wildcard => Ok(col("*")), item => Err(anyhow!("projection {} not supported", item)), } } } impl<'a> TryFrom<Source<'a>> for &'a str { type Error = anyhow::Error; fn try_from(source: Source<'a>) -> Result<Self, Self::Error> { if source.0.len() != 1 { return Err(anyhow!("We only support single data source at the moment")); } let table = &source.0[0]; if !table.joins.is_empty() { return Err(anyhow!("We do not support joint data source at the moment")); } match &table.relation { TableFactor::Table { name, .. } => Ok(&name.0.first().unwrap().value), _ => Err(anyhow!("We only support table")), } } } /// 把 SqlParser 的 order by expr 转换成 (列名, 排序方法) impl<'a> TryFrom<Order<'a>> for (String, bool) { type Error = anyhow::Error; fn try_from(o: Order) -> Result<Self, Self::Error> { let name = match &o.0.expr { SqlExpr::Identifier(id) => id.to_string(), expr => { return Err(anyhow!( "We only support identifier for order by, got {}", expr )) } }; Ok((name, !o.0.asc.unwrap_or(true))) } } /// 把 SqlParser 的 offset expr 转换成 i64 impl<'a> From<Offset<'a>> for i64 { fn from(offset: Offset) -> Self { match offset.0 { SqlOffset { value: SqlExpr::Value(SqlValue::Number(v, _b)), .. } => v.parse().unwrap_or(0), _ => 0, } } } /// 把 SqlParser 的 Limit expr 转换成 usize impl<'a> From<Limit<'a>> for usize { fn from(l: Limit<'a>) -> Self { match l.0 { SqlExpr::Value(SqlValue::Number(v, _b)) => v.parse().unwrap_or(usize::MAX), _ => usize::MAX, } } } /// 把 SqlParser 的 value 转换成 DataFrame 支持的 LiteralValue impl TryFrom<Value> for LiteralValue { type Error = anyhow::Error; fn try_from(v: Value) -> Result<Self, Self::Error> { match v.0 { SqlValue::Number(v, _) => Ok(LiteralValue::Float64(v.parse().unwrap())), SqlValue::Boolean(v) => Ok(LiteralValue::Boolean(v)), SqlValue::Null => Ok(LiteralValue::Null), v => Err(anyhow!("Value {} is not supported", v)), } } }
单元测试
#[cfg(test)] mod tests { use super::*; use crate::TyrDialect; use sqlparser::parser::Parser; #[test] fn parse_sql_works() { let url = "http://abc.xyz/abc?a=1&b=2"; let sql = format!( "select a, b, c from {} where a=1 order by c desc limit 5 offset 10", url ); let statement = &Parser::parse_sql(&TyrDialect::default(), sql.as_ref()).unwrap()[0]; let sql: Sql = statement.try_into().unwrap(); assert_eq!(sql.source, url); assert_eq!(sql.limit, Some(5)); assert_eq!(sql.offset, Some(10)); assert_eq!(sql.order_by, vec![("c".into(), true)]); assert_eq!(sql.selection, vec![col("a"), col("b"), col("c")]); } }
src/dialect.rs
给方言结构体实现trait
// 创建自己的 sql 方言。TyrDialect 支持 identifier 可以是简单的 url impl Dialect for TyrDialect { fn is_identifier_start(&self, ch: char) -> bool { ('a'..='z').contains(&ch) || ('A'..='Z').contains(&ch) || ch == '_' } // identifier 可以有 ':', '/', '?', '&', '=' fn is_identifier_part(&self, ch: char) -> bool { ('a'..='z').contains(&ch) || ('A'..='Z').contains(&ch) || ('0'..='9').contains(&ch) || [':', '/', '?', '&', '=', '-', '_', '.'].contains(&ch) } }
添加测试用函数
/// 测试辅助函数 pub fn example_sql() -> String { let url = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/latest/owid-covid-latest.csv"; let sql = format!( "SELECT location name, total_cases, new_cases, total_deaths, new_deaths \ FROM {} where new_deaths >= 500 ORDER BY new_cases DESC LIMIT 6 OFFSET 5", url ); sql }
单元测试
#[cfg(test)] mod tests { use super::*; use sqlparser::parser::Parser; #[test] fn it_works() { assert!(Parser::parse_sql(&TyrDialect::default(), &example_sql()).is_ok()); } }
src/loader.rs
定义Loader与CsvLoader
#[derive(Debug)] #[non_exhaustive] pub enum Loader { Csv(CsvLoader), } #[derive(Default, Debug)] pub struct CsvLoader(pub(crate) String);
定义trait并给CsvLoader实现
let sql = format!( "SELECT location name, total_cases, new_cases, total_deaths, new_deaths \ FROM {} where new_deaths >= 500 ORDER BY new_cases DESC LIMIT 6 OFFSET 5", url ); sql } #[cfg(test)] mod tests { use super::*; use sqlparser::parser::Parser;
todo: 给CsvLoader添加内容检测
fn it_works() { assert!(Parser::parse_sql(&TyrDialect::default(), &example_sql()).is_ok()); } }
src/fetcher.rs
定义UrlFetcher与FileFetcher
struct UrlFetcher<'a>(pub(crate) &'a str); struct FileFetcher<'a>(pub(crate) &'a str);
定义trait并给Fetcher与FileFetcher实现
// Rust 的 async trait 还没有稳定,可以用 async_trait 宏 #[async_trait] pub trait Fetch { type Error; async fn fetch(&self) -> Result<String, Self::Error>; } #[async_trait] impl<'a> Fetch for UrlFetcher<'a> { type Error = anyhow::Error; async fn fetch(&self) -> Result<String, Self::Error> { Ok(reqwest::get(self.0).await?.text().await?) } } #[async_trait] impl<'a> Fetch for FileFetcher<'a> { type Error = anyhow::Error; async fn fetch(&self) -> Result<String, Self::Error> { Ok(fs::read_to_string(&self.0[7..]).await?) } }
最后定义一个获取数据的方法
/// 从文件源或者 http 源中获取数据,返回字符串 pub async fn retrieve_data(source: impl AsRef<str>) -> Result<String> { let name = source.as_ref(); match &name[..4] { // 包括 http / https "http" => UrlFetcher(name).fetch().await, // 处理 file://<filename> "file" => FileFetcher(name).fetch().await, _ => Err(anyhow!("We only support http/https/file at the moment")), } }
queryer-js package: 使用neon
Cargo.toml
[package]
name = "queryer-js"
version = "0.1.0"
license = "ISC"
edition = "2021"
exclude = ["index.node"]
[lib]
crate-type = ["cdylib"]
[dependencies]
anyhow = "1"
queryer = { path = "../queryer" }
tokio = { version = "1", features = ["full"] }
[dependencies.neon]
version = "0.9"
default-features = false
features = ["napi-6"]
build in package.json
{
"name": "queryer-js",
"version": "0.1.0",
"description": "",
"main": "index.node",
"scripts": {
"build": "cargo-cp-artifact -nc index.node -- cargo build --message-format=json-render-diagnostics",
"build-debug": "npm run build --",
"build-release": "npm run build -- --release",
"install": "npm run build-release",
"test": "cargo test"
},
"author": "",
"license": "ISC",
"devDependencies": {
"cargo-cp-artifact": "^0.1"
}
}
src/lib.rs
use neon::prelude::*; pub fn example_sql(mut cx: FunctionContext) -> JsResult<JsString> { Ok(cx.string(queryer::example_sql())) } fn query(mut cx: FunctionContext) -> JsResult<JsString> { let sql = cx.argument::<JsString>(0)?.value(&mut cx); let output = match cx.argument::<JsString>(1) { Ok(v) => v.value(&mut cx), Err(_) => "csv".to_string(), }; let rt = tokio::runtime::Runtime::new().unwrap(); let data = rt.block_on(async { queryer::query(sql).await.unwrap() }); match output.as_str() { "csv" => Ok(cx.string(data.to_csv().unwrap())), v => cx.throw_type_error(format!("Output type {} not supported", v)), } } #[neon::main] fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("example_sql", example_sql)?; cx.export_function("query", query)?; Ok(()) }
queryer-py package: 使用pyo3
python调用查询包
Cargo.toml
[package]
name = "queryer_py" # Python 模块需要用下划线
version = "0.1.0"
edition = "2021"
[lib]
crate-type = ["cdylib"] # 使用 cdylib 类型
[dependencies]
queryer = { path = "../queryer" } # 引入 queryer
tokio = { version = "1", features = ["full"] }
[dependencies.pyo3]
version = "0.14"
features = ["extension-module"]
[build-dependencies]
pyo3-build-config = "0.14"
src/lib.rs
#![allow(clippy::needless_option_as_deref)] use pyo3::{exceptions, prelude::*}; #[pyfunction] pub fn example_sql() -> PyResult<String> { Ok(queryer::example_sql()) } #[pyfunction] pub fn query(sql: &str, output: Option<&str>) -> PyResult<String> { let rt = tokio::runtime::Runtime::new().unwrap(); let data = rt.block_on(async { queryer::query(sql).await.unwrap() }); match output { Some("csv") | None => Ok(data.to_csv().unwrap()), Some(v) => Err(exceptions::PyTypeError::new_err(format!( "Output type {} not supported", v ))), } } #[pymodule] fn queryer_py(_py: Python, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(query, m)?)?; m.add_function(wrap_pyfunction!(example_sql, m)?)?; Ok(()) }
data-viewer package: 使用tauri
Cargo.toml
[package] name = "app" version = "0.1.0" description = "A Tauri App" authors = ["you"] license = "" repository = "" default-run = "app" edition = "2021" build = "src/build.rs" See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [build-dependencies] tauri-build = { version = "1.0.0-beta.4" } [dependencies] anyhow = "1" serde_json = "1" queryer = { path = "../../queryer" } serde = { version = "1.0", features = ["derive"] } tauri = { version = "1.0.0-beta.8", features = ["api-all"] } [features] default = [ "custom-protocol" ] custom-protocol = [ "tauri/custom-protocol" ]
main.rs
#![cfg_attr( all(not(debug_assertions), target_os = "windows"), windows_subsystem = "windows" )] #[tauri::command] fn example_sql() -> String { queryer::example_sql() } #[tauri::command] async fn query(sql: String) -> Result<String, String> { let data = queryer::query(&sql).await.map_err(|err| err.to_string())?; Ok(data.to_csv().map_err(|err| err.to_string())?) } fn main() { tauri::Builder::default() .invoke_handler(tauri::generate_handler![example_sql, query]) .run(tauri::generate_context!()) .expect("error while running tauri application"); }