Title: | Extract-Transform-Load Framework for Medium Data |
---|---|
Description: | A predictable and pipeable framework for performing ETL (extract-transform-load) operations on publicly-accessible medium-sized data set. This package sets up the method structure and implements generic functions. Packages that depend on this package download specific data sets from the Internet, clean them up, and import them into a local or remote relational database management system. |
Authors: | Benjamin S. Baumer [aut, cre] , Carson Sievert [ctb], Natalia Iannucci [ctb] |
Maintainer: | Benjamin S. Baumer <[email protected]> |
License: | CC0 |
Version: | 0.4.1 |
Built: | 2024-11-05 03:27:47 UTC |
Source: | https://github.com/beanumber/etl |
Create an ETL package skeleton
create_etl_package(...)
create_etl_package(...)
... |
arguments passed to |
Extends create_package
and places a template source file in
the R subdirectory of the new package. The file has a working stub of etl_extract
.
The new package can be built immediately and run.
New S3 methods for etl_transform
and etl_load
can be added if
necessary, but the default methods may suffice.
etl_extract
, etl_transform
, etl_load
## Not run: path <- file.path(tempdir(), "scorecard") create_etl_package(path) ## End(Not run) # Now switch projects, and "Install and Restart"
## Not run: path <- file.path(tempdir(), "scorecard") create_etl_package(path) ## End(Not run) # Now switch projects, and "Install and Restart"
Return the database type for an ETL or DBI connection
db_type(obj, ...) ## S3 method for class 'src_dbi' db_type(obj, ...) ## S3 method for class 'DBIConnection' db_type(obj, ...)
db_type(obj, ...) ## S3 method for class 'src_dbi' db_type(obj, ...) ## S3 method for class 'DBIConnection' db_type(obj, ...)
obj |
and |
... |
currently ignored |
if (require(RMySQL) && mysqlHasDefault()) { # connect to test database using rs-dbi db <- src_mysql_cnf() class(db) db # connect to another server using the 'client' group db_type(db) db_type(db$con) }
if (require(RMySQL) && mysqlHasDefault()) { # connect to test database using rs-dbi db <- src_mysql_cnf() class(db) db # connect to another server using the 'client' group db_type(db) db_type(db$con) }
Execute an SQL script
dbRunScript(conn, script, echo = FALSE, ...)
dbRunScript(conn, script, echo = FALSE, ...)
conn |
a |
script |
Either a filename pointing to an SQL script or a character vector of length 1 containing SQL. |
echo |
print the SQL commands to the output? |
... |
arguments passed to |
The SQL script file must be ;
delimited.
a list of results from dbExecute
for each of the individual
SQL statements in script
.
sql <- "SHOW TABLES; SELECT 1+1 as Two;" sql2 <- system.file("sql", "mtcars.mysql", package = "etl") sql3 <- "SELECT * FROM user WHERE user = 'mysql';SELECT * FROM user WHERE 't' = 't';" if (require(RSQLite)) { con <- dbConnect(RSQLite::SQLite()) dbRunScript(con, "SELECT 1+1 as Two; VACUUM; ANALYZE;") } ## Not run: if (require(RMySQL)) { con <- dbConnect(RMySQL::MySQL(), default.file = path.expand("~/.my.cnf"), group = "client", user = NULL, password = NULL, dbname = "mysql", host = "127.0.0.1") dbRunScript(con, script = sql) dbRunScript(con, script = sql2) dbRunScript(con, script = sql3) dbDisconnect(con) } ## End(Not run)
sql <- "SHOW TABLES; SELECT 1+1 as Two;" sql2 <- system.file("sql", "mtcars.mysql", package = "etl") sql3 <- "SELECT * FROM user WHERE user = 'mysql';SELECT * FROM user WHERE 't' = 't';" if (require(RSQLite)) { con <- dbConnect(RSQLite::SQLite()) dbRunScript(con, "SELECT 1+1 as Two; VACUUM; ANALYZE;") } ## Not run: if (require(RMySQL)) { con <- dbConnect(RMySQL::MySQL(), default.file = path.expand("~/.my.cnf"), group = "client", user = NULL, password = NULL, dbname = "mysql", host = "127.0.0.1") dbRunScript(con, script = sql) dbRunScript(con, script = sql2) dbRunScript(con, script = sql3) dbDisconnect(con) } ## End(Not run)
Wipe out all tables in a database
dbWipe(conn, ...)
dbWipe(conn, ...)
conn |
A DBIConnection object, as returned by
|
... |
Other parameters passed on to methods. |
Finds all tables within a database and removes them
etl
objectInitialize an etl
object
etl(x, db = NULL, dir = tempdir(), ...) ## Default S3 method: etl(x, db = NULL, dir = tempdir(), ...) ## S3 method for class 'etl' summary(object, ...) is.etl(object) ## S3 method for class 'etl' print(x, ...)
etl(x, db = NULL, dir = tempdir(), ...) ## Default S3 method: etl(x, db = NULL, dir = tempdir(), ...) ## S3 method for class 'etl' summary(object, ...) is.etl(object) ## S3 method for class 'etl' print(x, ...)
x |
the name of the |
db |
a database connection that inherits from |
dir |
a directory to store the raw and processed data files |
... |
arguments passed to methods (currently ignored) |
object |
an object for which a summary is desired. |
A constructor function that instantiates an etl
object.
An etl
object extends a src_dbi
object.
It also has attributes for:
the name of the etl
package corresponding to the data source
the directory where the raw and processed data are stored
the directory where the raw data files are stored
the directory where the processed data files are stored
Just like any src_dbi
object, an etl
object
is a data source backed by an SQL database. However, an etl
object
has additional functionality based on the presumption that the SQL database
will be populated from data files stored on the local hard disk. The ETL functions
documented in etl_create
provide the necessary functionality
for extracting data from the Internet to raw_dir
,
transforming those data
and placing the cleaned up data (usually in CSV format) into load_dir
,
and finally loading the clean data into the SQL database.
For etl
, an object of class etl_x
and
etl
that inherits
from src_dbi
For is.etl
, TRUE
or FALSE
,
depending on whether x
has class etl
# Instantiate the etl object cars <- etl("mtcars") str(cars) is.etl(cars) summary(cars) ## Not run: # connect to a PostgreSQL server if (require(RPostgreSQL)) { db <- src_postgres("mtcars", user = "postgres", host = "localhost") cars <- etl("mtcars", db) } ## End(Not run) # Do it step-by-step cars %>% etl_extract() %>% etl_transform() %>% etl_load() src_tbls(cars) cars %>% tbl("mtcars") %>% group_by(cyl) %>% summarize(N = n(), mean_mpg = mean(mpg)) # Do it all in one step cars2 <- etl("mtcars") cars2 %>% etl_update() src_tbls(cars2) # generic summary function provides information about the object cars <- etl("mtcars") summary(cars) cars <- etl("mtcars") # returns TRUE is.etl(cars) # returns FALSE is.etl("hello world") cars <- etl("mtcars") %>% etl_create() cars
# Instantiate the etl object cars <- etl("mtcars") str(cars) is.etl(cars) summary(cars) ## Not run: # connect to a PostgreSQL server if (require(RPostgreSQL)) { db <- src_postgres("mtcars", user = "postgres", host = "localhost") cars <- etl("mtcars", db) } ## End(Not run) # Do it step-by-step cars %>% etl_extract() %>% etl_transform() %>% etl_load() src_tbls(cars) cars %>% tbl("mtcars") %>% group_by(cyl) %>% summarize(N = n(), mean_mpg = mean(mpg)) # Do it all in one step cars2 <- etl("mtcars") cars2 %>% etl_update() src_tbls(cars2) # generic summary function provides information about the object cars <- etl("mtcars") summary(cars) cars <- etl("mtcars") # returns TRUE is.etl(cars) # returns FALSE is.etl("hello world") cars <- etl("mtcars") %>% etl_create() cars
These generic functions provide a systematic approach for performing ETL (exchange-transform-load) operations on medium sized data.
etl_cleanup(obj, ...) ## Default S3 method: etl_cleanup( obj, delete_raw = FALSE, delete_load = FALSE, pattern = "\\.(csv|zip)$", ... ) etl_create(obj, ...) ## Default S3 method: etl_create(obj, ...) etl_update(obj, ...) ## Default S3 method: etl_update(obj, ...) etl_extract(obj, ...) ## Default S3 method: etl_extract(obj, ...) ## S3 method for class 'etl_mtcars' etl_extract(obj, ...) ## S3 method for class 'etl_cities' etl_extract(obj, ...) etl_load(obj, ...) ## Default S3 method: etl_load(obj, ...) etl_transform(obj, ...) ## Default S3 method: etl_transform(obj, ...) ## S3 method for class 'etl_cities' etl_transform(obj, ...)
etl_cleanup(obj, ...) ## Default S3 method: etl_cleanup( obj, delete_raw = FALSE, delete_load = FALSE, pattern = "\\.(csv|zip)$", ... ) etl_create(obj, ...) ## Default S3 method: etl_create(obj, ...) etl_update(obj, ...) ## Default S3 method: etl_update(obj, ...) etl_extract(obj, ...) ## Default S3 method: etl_extract(obj, ...) ## S3 method for class 'etl_mtcars' etl_extract(obj, ...) ## S3 method for class 'etl_cities' etl_extract(obj, ...) etl_load(obj, ...) ## Default S3 method: etl_load(obj, ...) etl_transform(obj, ...) ## Default S3 method: etl_transform(obj, ...) ## S3 method for class 'etl_cities' etl_transform(obj, ...)
obj |
an |
... |
arguments passed to methods |
delete_raw |
should files be deleted from the |
delete_load |
should files be deleted from the |
pattern |
regular expression matching file names to be deleted. By default,
this matches filenames ending in |
The purposes of these functions are to download data from a particular data source from the Internet, process it, and load it into a SQL database server.
There are five primary functions:
etl_init
Initialize the database schema.
Download data from the Internet and store it locally in its raw form.
Manipulate the raw data such that it can be loaded into a database table. Usually, this means converting the raw data to (a series of) CSV files, which are also stored locally.
Load the transformed data into the database.
Perform housekeeping, such as deleting unnecessary raw data files.
Additionally, two convenience functions chain these operations together:
Run all five functions in succession. This is useful when you want to create the database from scratch.
Run the etl_extract
-etl_transform
-etl_load
functions
in succession.
This is useful
when the database already exists, but you want to insert some new data.
Each one of these functions returns an etl
object, invisibly.
## Not run: if (require(RPostgreSQL)) { db <- src_postgres(dbname = "mtcars", user = "postgres", host = "localhost") cars <- etl("mtcars", db) } if (require(RMySQL) && mysqlHasDefault()) { db <- src_mysql(dbname = "mtcars", user = "r-user", host = "localhost", password = "mypass") cars <- etl("mtcars", db) } ## End(Not run) cars <- etl("mtcars") cars %>% etl_extract() %>% etl_transform() %>% etl_load() %>% etl_cleanup() cars cars %>% tbl(from = "mtcars") %>% group_by(cyl) %>% summarise(N = n(), mean_mpg = mean(mpg)) # do it all in one step, and peek at the SQL creation script cars %>% etl_create(echo = TRUE) # specify a directory for the data ## Not run: cars <- etl("mtcars", dir = "~/dumps/mtcars/") str(cars) ## End(Not run) cars <- etl("mtcars") # Do it step-by-step cars %>% etl_extract() %>% etl_transform() %>% etl_load() # Note the somewhat imprecise data types for the columns. These are the default. tbl(cars, "mtcars") # But you can also specify your own schema if you want schema <- system.file("sql", "init.sqlite", package = "etl") cars %>% etl_init(schema) %>% etl_load()
## Not run: if (require(RPostgreSQL)) { db <- src_postgres(dbname = "mtcars", user = "postgres", host = "localhost") cars <- etl("mtcars", db) } if (require(RMySQL) && mysqlHasDefault()) { db <- src_mysql(dbname = "mtcars", user = "r-user", host = "localhost", password = "mypass") cars <- etl("mtcars", db) } ## End(Not run) cars <- etl("mtcars") cars %>% etl_extract() %>% etl_transform() %>% etl_load() %>% etl_cleanup() cars cars %>% tbl(from = "mtcars") %>% group_by(cyl) %>% summarise(N = n(), mean_mpg = mean(mpg)) # do it all in one step, and peek at the SQL creation script cars %>% etl_create(echo = TRUE) # specify a directory for the data ## Not run: cars <- etl("mtcars", dir = "~/dumps/mtcars/") str(cars) ## End(Not run) cars <- etl("mtcars") # Do it step-by-step cars %>% etl_extract() %>% etl_transform() %>% etl_load() # Note the somewhat imprecise data types for the columns. These are the default. tbl(cars, "mtcars") # But you can also specify your own schema if you want schema <- system.file("sql", "init.sqlite", package = "etl") cars %>% etl_init(schema) %>% etl_load()
Initialize a database using a defined schema
etl_init( obj, script = NULL, schema_name = "init", pkg = attr(obj, "pkg"), ext = NULL, ... ) ## Default S3 method: etl_init( obj, script = NULL, schema_name = "init", pkg = attr(obj, "pkg"), ext = NULL, ... ) find_schema(obj, schema_name = "init", pkg = attr(obj, "pkg"), ext = NULL, ...)
etl_init( obj, script = NULL, schema_name = "init", pkg = attr(obj, "pkg"), ext = NULL, ... ) ## Default S3 method: etl_init( obj, script = NULL, schema_name = "init", pkg = attr(obj, "pkg"), ext = NULL, ... ) find_schema(obj, schema_name = "init", pkg = attr(obj, "pkg"), ext = NULL, ...)
obj |
An |
script |
either a vector of SQL commands to be executed, or
a file path as a character vector containing an SQL initialization script.
If |
schema_name |
The name of the schema. Default is |
pkg |
The package defining the schema. Should be set in |
ext |
The file extension used for the SQL schema file. If NULL (the default) it
be inferred from the |
... |
Currently ignored |
If the table definitions are at all non-trivial, you may wish to include a pre-defined table schema. This function will retrieve it.
cars <- etl("mtcars") cars %>% etl_init() cars %>% etl_init(script = sql("CREATE TABLE IF NOT EXISTS mtcars_alt (id INTEGER);")) cars %>% etl_init(schema_name = "init") init_script <- find_schema(cars, schema_name = "init") cars %>% etl_init(script = init_script, echo = TRUE) src_tbls(cars) cars <- etl("mtcars") find_schema(cars) find_schema(cars, "init", "etl") find_schema(cars, "my_crazy_schema", "etl")
cars <- etl("mtcars") cars %>% etl_init() cars %>% etl_init(script = sql("CREATE TABLE IF NOT EXISTS mtcars_alt (id INTEGER);")) cars %>% etl_init(schema_name = "init") init_script <- find_schema(cars, schema_name = "init") cars %>% etl_init(script = init_script, echo = TRUE) src_tbls(cars) cars <- etl("mtcars") find_schema(cars) find_schema(cars, "init", "etl") find_schema(cars, "my_crazy_schema", "etl")
Match year and month vectors to filenames
Extracts a date from filenames
match_files_by_year_months( files, pattern, years = as.numeric(format(Sys.Date(), "%Y")), months = 1:12, ... ) extract_date_from_filename(files, pattern, ...)
match_files_by_year_months( files, pattern, years = as.numeric(format(Sys.Date(), "%Y")), months = 1:12, ... ) extract_date_from_filename(files, pattern, ...)
files |
a character vector of filenames |
pattern |
a regular expression to be passed to |
years |
a numeric vector of years |
months |
a numeric vector of months |
... |
arguments passed to |
a character vector of files
that match the pattern
, year
, and month
arguments
a vector of POSIXct
dates matching the pattern
## Not run: if (require(airlines)) { airlines <- etl("airlines", dir = "~/Data/airlines") %>% etl_extract(year = 1987) summary(airlines) match_files_by_year_months(list.files(attr(airlines, "raw_dir")), pattern = "On_Time_On_Time_Performance_%Y_%m.zip", year = 1987) } ## End(Not run)
## Not run: if (require(airlines)) { airlines <- etl("airlines", dir = "~/Data/airlines") %>% etl_extract(year = 1987) summary(airlines) match_files_by_year_months(list.files(attr(airlines, "raw_dir")), pattern = "On_Time_On_Time_Performance_%Y_%m.zip", year = 1987) } ## End(Not run)
Download only those files that don't already exist
smart_download(obj, src, new_filenames = basename(src), clobber = FALSE, ...)
smart_download(obj, src, new_filenames = basename(src), clobber = FALSE, ...)
obj |
an |
src |
a character vector of URLs that you want to download |
new_filenames |
an optional character vector of filenames for the new
(local) files. Defaults to having the same filenames as those in |
clobber |
do you want to clobber any existing files? |
... |
arguments passed to |
Downloads only those files in src
that are not already present in
the directory specified by the raw_dir
attribute of obj
.
idiom courtesy of Hadley Wickham
## Not run: cars <- etl("mtcars") urls <- c("https://raw.githubusercontent.com/beanumber/etl/master/etl.Rproj", "https://www.reddit.com/robots.txt") smart_download(cars, src = urls) # won't download again if the files are already there smart_download(cars, src = urls) # use clobber to overwrite smart_download(cars, src = urls, clobber = TRUE) ## End(Not run)
## Not run: cars <- etl("mtcars") urls <- c("https://raw.githubusercontent.com/beanumber/etl/master/etl.Rproj", "https://www.reddit.com/robots.txt") smart_download(cars, src = urls) # won't download again if the files are already there smart_download(cars, src = urls) # use clobber to overwrite smart_download(cars, src = urls, clobber = TRUE) ## End(Not run)
Upload a list of files to the DB
smart_upload(obj, src = NULL, tablenames = NULL, ...)
smart_upload(obj, src = NULL, tablenames = NULL, ...)
obj |
An |
src |
a list of CSV files to upload. If |
tablenames |
a list the same length as |
... |
arguments passed to |
## Not run: if (require(RMySQL)) { # must have pre-existing database "fec" # if not, try system("mysql -e 'CREATE DATABASE IF NOT EXISTS fec;'") db <- src_mysql_cnf(dbname = "mtcars") } ## End(Not run)
## Not run: if (require(RMySQL)) { # must have pre-existing database "fec" # if not, try system("mysql -e 'CREATE DATABASE IF NOT EXISTS fec;'") db <- src_mysql_cnf(dbname = "mtcars") } ## End(Not run)
Connect to local MySQL Server using ~/.my.cnf
src_mysql_cnf(dbname = "test", groups = "rs-dbi", ...)
src_mysql_cnf(dbname = "test", groups = "rs-dbi", ...)
dbname |
name of the local database you wish to connect to. Default is
|
groups |
section of |
... |
arguments passed to |
if (require(RMySQL) && mysqlHasDefault()) { # connect to test database using rs-dbi db <- src_mysql_cnf() class(db) db # connect to another server using the 'client' group src_mysql_cnf(groups = "client") }
if (require(RMySQL) && mysqlHasDefault()) { # connect to test database using rs-dbi db <- src_mysql_cnf() class(db) db # connect to another server using the 'client' group src_mysql_cnf(groups = "client") }
Ensure that years and months are within a certain time span
valid_year_month(years, months, begin = "1870-01-01", end = Sys.Date())
valid_year_month(years, months, begin = "1870-01-01", end = Sys.Date())
years |
a numeric vector of years |
months |
a numeric vector of months |
begin |
the earliest valid date, defaults to the UNIX epoch |
end |
the most recent valid date, defaults to today |
Often, a data source will begin
and end
at
known points in time. At the same time, many data sources are divided
into monthly archives. Given a set of years
and months
,
any combination of which should be considered valid, this function will
return a data.frame
in which each row is one of those
valid year-month pairs. Further, if the optional begin
and
end
arguments are specified, the rows will be filter to lie
within that time interval. Furthermore, the first and last day of
each month are computed.
a data.frame
with four variables: year
,
month
, month_begin
(the first day of the month), and
month_end
(the last day of the month).
valid_year_month(years = 1999:2001, months = c(1:3, 7)) # Mets in the World Series since the UNIX epoch mets_ws <- c(1969, 1973, 1986, 2000, 2015) valid_year_month(years = mets_ws, months = 10) # Mets in the World Series during the Clinton administration if (require(ggplot2)) { clinton <- filter(presidential, name == "Clinton") valid_year_month(years = mets_ws, months = 10, begin = clinton$start, end = clinton$end) }
valid_year_month(years = 1999:2001, months = c(1:3, 7)) # Mets in the World Series since the UNIX epoch mets_ws <- c(1969, 1973, 1986, 2000, 2015) valid_year_month(years = mets_ws, months = 10) # Mets in the World Series during the Clinton administration if (require(ggplot2)) { clinton <- filter(presidential, name == "Clinton") valid_year_month(years = mets_ws, months = 10, begin = clinton$start, end = clinton$end) }