Easily caching expensive tasks¶
Data pipeline¶
invoke test
You should observe that the ‘expensive’ get step only happened the first time.
tasks.py
Below is a relatively simple tasks file that defines a few tasks. A couple which do something expensive, and cache their results on the filesystem:
- get_people - Writes a list of names to
people.txt
in line-separated form- get_peoples_ages - Writes a list of names + ages to
people-with-ages.txt
in line-separated form- print-peoples-ages - Prints the data found in
people-with-ages.txt
These tasks depend on magicinvoke.skippable()
to recognize that they
don’t need to actually execute if their output files are newer than
any input files (or in the case of get_people
, the output file exists).
That is, once calling print-peoples-ages
once, get-people
and
get-peoples-ages
should be skipped every subsequent time, unless
someone modifies people.txt
or the parameters to get-peoples-ages
changes.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 | from magicinvoke import Collection, Lazy, Path
ns = Collection()
"""
Two types of skippable tasks are demonstrated here:
1. get_people, which is clearly skippable, because its output goes into a file.
2. get_peoples_ages, which doesn't create an output file but returns a value.
a) Magicinvoke implicitly creates a file to cache the return value
of this function. Your return values must be pickleable.
"""
@ns.magictask(skippable=True)
def get_people(ctx, names_output_path=Lazy("ctx.people.names_path")):
print("get_people called")
Path(names_output_path).write_text(u"Tom\nJerry\nBill Nye\n")
print("Wrote {}".format(names_output_path))
@ns.magictask(params_from="ctx.people", pre=[get_people], skippable=True, autoprint=True)
def get_peoples_ages(ctx,
names_path,
important_flag=False
):
results = []
print("get_peoples_ages called")
for name in Path(names_path).read_text().splitlines():
print("Getting age for {}".format(name))
# We can pretend this is an expensive processing step where we pull
# some numbers from a DB :)
results.append((name, 39))
print("Done pulling results!")
return results
@ns.magictask
def print_peoples_ages(ctx):
print("print_peoples_ages called")
names_and_ages = get_peoples_ages(ctx)
for tup in names_and_ages:
print("{name}'s age is {age}".format(name=tup[0], age=tup[1]))
print("Done!")
ns.configure(
{
"people": {
"names_path": "people.txt",
}
}
)
|