Easily caching expensive tasks

Data pipeline

invoke test

You should observe that the ‘expensive’ get step only happened the first time.

tasks.py

Below is a relatively simple tasks file that defines a few tasks. A couple which do something expensive, and cache their results on the filesystem:

  • get_people - Writes a list of names to people.txt in line-separated form
  • get_peoples_ages - Writes a list of names + ages to people-with-ages.txt in line-separated form
  • print-peoples-ages - Prints the data found in people-with-ages.txt

These tasks depend on magicinvoke.skippable() to recognize that they don’t need to actually execute if their output files are newer than any input files (or in the case of get_people, the output file exists). That is, once calling print-peoples-ages once, get-people and get-peoples-ages should be skipped every subsequent time, unless someone modifies people.txt or the parameters to get-peoples-ages changes.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
from magicinvoke import Collection, Lazy, Path

ns = Collection()

"""
Two types of skippable tasks are demonstrated here:
    1. get_people, which is clearly skippable, because its output goes into a file.
    2. get_peoples_ages, which doesn't create an output file but returns a value.
      a) Magicinvoke implicitly creates a file to cache the return value
         of this function. Your return values must be pickleable.
"""


@ns.magictask(skippable=True)
def get_people(ctx, names_output_path=Lazy("ctx.people.names_path")):
    print("get_people called")
    Path(names_output_path).write_text(u"Tom\nJerry\nBill Nye\n")
    print("Wrote {}".format(names_output_path))


@ns.magictask(params_from="ctx.people", pre=[get_people], skippable=True, autoprint=True)
def get_peoples_ages(ctx,
                     names_path,
                     important_flag=False
):
    results = []
    print("get_peoples_ages called")
    for name in Path(names_path).read_text().splitlines():
        print("Getting age for {}".format(name))
        # We can pretend this is an expensive processing step where we pull
        # some numbers from a DB :)
        results.append((name, 39))
    print("Done pulling results!")
    return results


@ns.magictask
def print_peoples_ages(ctx):
    print("print_peoples_ages called")
    names_and_ages = get_peoples_ages(ctx)
    for tup in names_and_ages:
        print("{name}'s age is {age}".format(name=tup[0], age=tup[1]))
    print("Done!")


ns.configure(
    {
        "people": {
            "names_path": "people.txt",
        }
    }
)