Chapter 21: Working with Data¶
1. Overview¶
Most programs that do something useful work with data: reading records from a
file, cleaning up messy input, filtering down to what matters, sorting and
grouping results, and computing summaries. You do not need a third-party
library like pandas to do this well. Python's standard library — csv,
json, pathlib, and a handful of built-in functions — is enough for a wide
range of real tasks.
This chapter walks through the full cycle: loading data from CSV and JSON files, cleaning it, transforming it, and producing useful output. Every technique here uses only the standard library.
2. What You Will Learn¶
- Reading tabular data from CSV files with
csv.DictReader - Reading structured data from JSON files
- Cleaning data: stripping whitespace, handling missing values, converting types
- Filtering records with list comprehensions and functions
- Sorting records by one or more fields
- Grouping records by a key
- Using lists of dicts as an in-memory table
- Simple aggregation: sum, average, count, min, max
- Writing cleaned or processed data back to CSV or JSON
- Building small data pipelines from composable functions
3. Core Concepts¶
3.1 The List-of-Dicts Pattern¶
The most natural way to represent tabular data in Python — without any third-party library — is a list of dicts. Each dict is one row; the keys are column names.
employees: list[dict] = [
{"name": "Alice", "department": "Engineering", "salary": 95000},
{"name": "Bob", "department": "Marketing", "salary": 72000},
{"name": "Charlie", "department": "Engineering", "salary": 88000},
{"name": "Diana", "department": "Marketing", "salary": 76000},
]
This structure is easy to read, easy to filter, easy to sort, and maps
directly to what csv.DictReader and json.load produce. All the patterns
in this chapter work on lists of dicts.
3.2 Reading CSV Files¶
Use csv.DictReader to read a CSV file with a header row. Each row becomes
a dict whose keys are the column names from the header.
import csv
from pathlib import Path
def load_csv(path: str | Path) -> list[dict]:
"""Load a CSV file and return a list of row dicts."""
with Path(path).open(encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
return list(reader)
Every value in the returned dicts is a string — csv.DictReader does
not convert types. You will handle type conversion in the cleaning step.
Always pass newline="" when opening CSV files. This lets the csv module
handle line endings correctly across platforms.
3.3 Reading JSON Files¶
Use json.load to read a JSON file. If the file contains an array of
objects, you get a list of dicts directly.
import json
from pathlib import Path
def load_json(path: str | Path) -> list[dict]:
"""Load a JSON file containing an array of objects."""
with Path(path).open(encoding="utf-8") as f:
return json.load(f)
Unlike CSV, JSON preserves types: numbers stay numbers, booleans stay
booleans, and null becomes None. You may still need to clean string
fields (whitespace, empty strings), but numeric fields are ready to use.
3.4 Cleaning Data¶
Real data is messy. Fields may have leading or trailing whitespace, missing values, or the wrong type. Clean your data right after loading it, before doing anything else.
Stripping whitespace¶
def strip_fields(record: dict) -> dict:
"""Strip leading/trailing whitespace from all string values."""
return {k: v.strip() if isinstance(v, str) else v for k, v in record.items()}
Apply it to every record after loading:
Handling missing values¶
CSV files often represent missing data as an empty string "". Decide what
to do with each field: skip the record, substitute a default, or keep None.
def parse_salary(value: str, default: float = 0.0) -> float:
"""Convert a salary string to float; return default if blank or invalid."""
value = value.strip()
if not value:
return default
try:
return float(value.replace(",", ""))
except ValueError:
return default
Type conversion¶
All CSV values start as strings. Convert them to the right type as part of cleaning.
def clean_employee(raw: dict) -> dict:
"""Clean and type-convert a raw employee record from CSV."""
return {
"name": raw.get("name", "").strip(),
"department": raw.get("department", "").strip(),
"salary": parse_salary(raw.get("salary", "")),
"active": raw.get("active", "").strip().lower() == "true",
}
A clean pipeline looks like this:
After this step, every record has the right types and no stray whitespace. The rest of your code can trust the data.
3.5 Filtering Records¶
Use a list comprehension to keep only the records that match a condition.
# Keep only Engineering employees
engineers = [e for e in employees if e["department"] == "Engineering"]
# Keep only active employees with salary above 80,000
senior = [
e for e in employees
if e["active"] and e["salary"] > 80_000
]
For reusable filters, write a function that takes a predicate:
from collections.abc import Callable
def filter_records(
records: list[dict],
predicate: Callable[[dict], bool],
) -> list[dict]:
"""Return records for which predicate returns True."""
return [r for r in records if predicate(r)]
high_earners = filter_records(employees, lambda e: e["salary"] > 90_000)
3.6 Sorting Records¶
Use sorted() with a key function. The key receives one record and
returns the value to sort by.
# Sort by salary, highest first
by_salary = sorted(employees, key=lambda e: e["salary"], reverse=True)
# Sort by department, then by name within each department
by_dept_name = sorted(employees, key=lambda e: (e["department"], e["name"]))
sorted() always returns a new list; the original is unchanged. Use
list.sort() if you want to sort in place.
For descending sort on one field and ascending on another, negate the numeric field:
# Highest salary first; alphabetical name as tiebreaker
ranked = sorted(employees, key=lambda e: (-e["salary"], e["name"]))
3.7 Grouping Records¶
Grouping means collecting records that share a value into buckets. The standard pattern uses a dict of lists.
from collections import defaultdict
def group_by(records: list[dict], key: str) -> dict[str, list[dict]]:
"""Group records by the value of a given key."""
groups: dict[str, list[dict]] = defaultdict(list)
for record in records:
groups[record[key]].append(record)
return dict(groups)
by_department = group_by(employees, "department")
for dept, members in sorted(by_department.items()):
names = [m["name"] for m in members]
print(f"{dept}: {', '.join(names)}")
defaultdict(list) automatically creates an empty list for any new key, so
you do not need to check whether the key exists before appending.
3.8 Aggregation¶
Once you have a list of records (or a group), computing summaries is straightforward with Python's built-in functions.
def summarize(records: list[dict], field: str) -> dict:
"""Return count, sum, average, min, and max for a numeric field."""
values = [r[field] for r in records if r[field] is not None]
if not values:
return {"count": 0, "sum": 0, "avg": None, "min": None, "max": None}
return {
"count": len(values),
"sum": sum(values),
"avg": sum(values) / len(values),
"min": min(values),
"max": max(values),
}
stats = summarize(employees, "salary")
print(f"Count: {stats['count']}")
print(f"Average: {stats['avg']:,.0f}")
print(f"Min: {stats['min']:,.0f}")
print(f"Max: {stats['max']:,.0f}")
To aggregate per group, combine group_by and summarize:
by_dept = group_by(employees, "department")
for dept, members in sorted(by_dept.items()):
stats = summarize(members, "salary")
print(f"{dept}: avg salary {stats['avg']:,.0f} ({stats['count']} people)")
3.9 Writing Results¶
After processing, write results back to CSV or JSON.
Writing CSV¶
import csv
from pathlib import Path
def save_csv(records: list[dict], path: str | Path) -> None:
"""Write a list of dicts to a CSV file."""
if not records:
return
fieldnames = list(records[0].keys())
with Path(path).open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(records)
Writing JSON¶
import json
from pathlib import Path
def save_json(data: list | dict, path: str | Path, indent: int = 2) -> None:
"""Write data to a JSON file."""
with Path(path).open("w", encoding="utf-8") as f:
json.dump(data, f, indent=indent)
4. Practical Examples¶
4.1 Loading and Cleaning a Sales CSV¶
Suppose you have a sales.csv file:
date,product,units,revenue
2024-01-03,Widget A,12,240.00
2024-01-03,Widget B, 5,200.00
2024-01-04,Widget A,,
2024-01-04,Gadget X,3,600.00
Load, clean, and inspect it:
import csv
import json
from pathlib import Path
def parse_int(value: str, default: int = 0) -> int:
try:
return int(value.strip())
except (ValueError, AttributeError):
return default
def parse_float(value: str, default: float = 0.0) -> float:
try:
return float(value.strip())
except (ValueError, AttributeError):
return default
def clean_sale(raw: dict) -> dict:
return {
"date": raw.get("date", "").strip(),
"product": raw.get("product", "").strip(),
"units": parse_int(raw.get("units", "")),
"revenue": parse_float(raw.get("revenue", "")),
}
def load_sales(path: str | Path) -> list[dict]:
with Path(path).open(encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
return [clean_sale(row) for row in reader]
# sales = load_sales("sales.csv")
# for s in sales:
# print(s)
# {'date': '2024-01-03', 'product': 'Widget A', 'units': 12, 'revenue': 240.0}
# {'date': '2024-01-03', 'product': 'Widget B', 'units': 5, 'revenue': 200.0}
# {'date': '2024-01-04', 'product': 'Widget A', 'units': 0, 'revenue': 0.0}
# {'date': '2024-01-04', 'product': 'Gadget X', 'units': 3, 'revenue': 600.0}
The row with missing units and revenue gets default values of 0 instead
of crashing. You can decide later whether to drop those rows or keep them.
4.2 Filtering and Sorting Sales¶
def top_products_by_revenue(
sales: list[dict],
n: int = 5,
) -> list[dict]:
"""Return the top N products by total revenue."""
# Group by product
totals: dict[str, float] = {}
for sale in sales:
product = sale["product"]
totals[product] = totals.get(product, 0.0) + sale["revenue"]
# Convert to list of dicts and sort
ranked = [
{"product": p, "total_revenue": r}
for p, r in totals.items()
]
return sorted(ranked, key=lambda x: x["total_revenue"], reverse=True)[:n]
# results = top_products_by_revenue(sales)
# for r in results:
# print(f"{r['product']:<20} ${r['total_revenue']:>10,.2f}")
4.3 Grouping and Summarizing by Date¶
from collections import defaultdict
def daily_summary(sales: list[dict]) -> list[dict]:
"""Return total units and revenue per date, sorted by date."""
by_date: dict[str, dict] = defaultdict(lambda: {"units": 0, "revenue": 0.0})
for sale in sales:
day = sale["date"]
by_date[day]["units"] += sale["units"]
by_date[day]["revenue"] += sale["revenue"]
return [
{"date": d, "units": v["units"], "revenue": v["revenue"]}
for d, v in sorted(by_date.items())
]
# summary = daily_summary(sales)
# for row in summary:
# print(f"{row['date']} units={row['units']} revenue=${row['revenue']:.2f}")
4.4 Loading and Processing JSON Records¶
Suppose you have a products.json file:
[
{"id": 1, "name": "Widget A", "price": 20.00, "in_stock": true},
{"id": 2, "name": "Widget B", "price": 40.00, "in_stock": false},
{"id": 3, "name": "Gadget X", "price": 200.00, "in_stock": true}
]
Load and filter it:
import json
from pathlib import Path
def load_products(path: str | Path) -> list[dict]:
with Path(path).open(encoding="utf-8") as f:
return json.load(f)
def available_products(products: list[dict]) -> list[dict]:
"""Return only in-stock products, sorted by price."""
in_stock = [p for p in products if p.get("in_stock")]
return sorted(in_stock, key=lambda p: p["price"])
# products = load_products("products.json")
# for p in available_products(products):
# print(f"{p['name']:<20} ${p['price']:.2f}")
4.5 Computing Aggregates Across Groups¶
def department_stats(employees: list[dict]) -> list[dict]:
"""Return salary statistics per department."""
by_dept: dict[str, list[float]] = defaultdict(list)
for emp in employees:
if emp.get("salary") is not None:
by_dept[emp["department"]].append(emp["salary"])
result = []
for dept, salaries in sorted(by_dept.items()):
result.append({
"department": dept,
"count": len(salaries),
"avg_salary": sum(salaries) / len(salaries),
"min_salary": min(salaries),
"max_salary": max(salaries),
})
return result
# stats = department_stats(employees)
# for row in stats:
# print(
# f"{row['department']:<15} "
# f"n={row['count']} "
# f"avg={row['avg_salary']:,.0f} "
# f"min={row['min_salary']:,.0f} "
# f"max={row['max_salary']:,.0f}"
# )
4.6 A Complete Data Pipeline¶
Here is a self-contained pipeline that loads a CSV, cleans it, filters it, groups it, summarizes it, and writes the result to a new CSV — all using only the standard library.
"""
pipeline.py — a complete data processing pipeline.
Input: employees.csv (name, department, salary, active)
Output: dept_summary.csv (department, count, avg_salary, min_salary, max_salary)
"""
import csv
import sys
from collections import defaultdict
from pathlib import Path
# ── Loading ──────────────────────────────────────────────────────────────────
def load_csv(path: Path) -> list[dict]:
with path.open(encoding="utf-8", newline="") as f:
return list(csv.DictReader(f))
# ── Cleaning ─────────────────────────────────────────────────────────────────
def parse_float(value: str, default: float = 0.0) -> float:
try:
return float(value.strip().replace(",", ""))
except (ValueError, AttributeError):
return default
def clean_employee(raw: dict) -> dict:
return {
"name": raw.get("name", "").strip(),
"department": raw.get("department", "").strip(),
"salary": parse_float(raw.get("salary", "")),
"active": raw.get("active", "").strip().lower() == "true",
}
# ── Filtering ─────────────────────────────────────────────────────────────────
def active_only(employees: list[dict]) -> list[dict]:
return [e for e in employees if e["active"]]
# ── Grouping ──────────────────────────────────────────────────────────────────
def group_by_department(employees: list[dict]) -> dict[str, list[dict]]:
groups: dict[str, list[dict]] = defaultdict(list)
for emp in employees:
groups[emp["department"]].append(emp)
return dict(groups)
# ── Aggregation ───────────────────────────────────────────────────────────────
def dept_summary(dept: str, members: list[dict]) -> dict:
salaries = [m["salary"] for m in members]
return {
"department": dept,
"count": len(salaries),
"avg_salary": round(sum(salaries) / len(salaries), 2) if salaries else 0,
"min_salary": min(salaries) if salaries else 0,
"max_salary": max(salaries) if salaries else 0,
}
# ── Writing ───────────────────────────────────────────────────────────────────
def save_csv(records: list[dict], path: Path) -> None:
if not records:
return
with path.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=list(records[0].keys()))
writer.writeheader()
writer.writerows(records)
# ── Pipeline ──────────────────────────────────────────────────────────────────
def run(input_path: Path, output_path: Path) -> None:
raw = load_csv(input_path)
employees = [clean_employee(r) for r in raw]
active = active_only(employees)
by_dept = group_by_department(active)
summaries = [
dept_summary(dept, members)
for dept, members in sorted(by_dept.items())
]
save_csv(summaries, output_path)
print(f"Wrote {len(summaries)} department summaries to {output_path}.")
if __name__ == "__main__":
run(Path("employees.csv"), Path("dept_summary.csv"))
Each step is a small, named function. You can test each one independently, swap out the data source, or add a new step without touching the others.
4.7 Searching and Deduplicating Records¶
def find_by_field(
records: list[dict],
field: str,
value: object,
) -> list[dict]:
"""Return all records where records[field] == value."""
return [r for r in records if r.get(field) == value]
def deduplicate_by(records: list[dict], key: str) -> list[dict]:
"""Return records with duplicate key values removed (keeps first occurrence)."""
seen: set = set()
result = []
for record in records:
k = record.get(key)
if k not in seen:
seen.add(k)
result.append(record)
return result
# engineers = find_by_field(employees, "department", "Engineering")
# unique_by_name = deduplicate_by(employees, "name")
5. Common Mistakes¶
5.1 Forgetting That CSV Values Are Always Strings¶
csv.DictReader returns every value as a string, even if the CSV contains
numbers. Arithmetic on strings silently produces wrong results.
import csv, io
data = "name,score\nAlice,88\nBob,92\n"
reader = csv.DictReader(io.StringIO(data))
rows = list(reader)
# Wrong — string concatenation, not addition
total = rows[0]["score"] + rows[1]["score"]
print(total) # '8892' — not 180!
# Correct — convert first
total = int(rows[0]["score"]) + int(rows[1]["score"])
print(total) # 180
Always convert numeric fields to int or float right after loading.
5.2 Not Handling Missing or Empty Fields¶
If a field is missing or blank, int("") and float("") raise ValueError.
Wrap conversions in a helper that returns a sensible default.
# Crashes on empty string
value = int("") # ValueError: invalid literal for int() with base 10: ''
# Safe
def to_int(s: str, default: int = 0) -> int:
try:
return int(s.strip())
except (ValueError, AttributeError):
return default
5.3 Mutating Records While Iterating¶
Modifying a list while iterating over it skips items or causes unexpected behavior. Build a new list instead.
records = [{"score": 50}, {"score": 80}, {"score": 30}]
# Wrong — modifying the list while iterating
for r in records:
if r["score"] < 60:
records.remove(r) # skips items!
# Correct — build a new list
records = [r for r in records if r["score"] >= 60]
5.4 Using a Plain dict Instead of defaultdict for Grouping¶
Without defaultdict, you must check whether a key exists before appending.
Forgetting the check raises a KeyError.
# Fragile — KeyError if key is new
groups = {}
for record in records:
groups[record["dept"]].append(record) # KeyError on first new dept
# Correct option 1 — setdefault
groups = {}
for record in records:
groups.setdefault(record["dept"], []).append(record)
# Correct option 2 — defaultdict
from collections import defaultdict
groups = defaultdict(list)
for record in records:
groups[record["dept"]].append(record)
5.5 Sorting Strings That Should Be Numbers¶
If you forget to convert a numeric field, sorted() sorts it
lexicographically (as text), which gives the wrong order.
records = [
{"name": "A", "score": "9"},
{"name": "B", "score": "10"},
{"name": "C", "score": "2"},
]
# Wrong — lexicographic order: "10" < "2" < "9"
wrong = sorted(records, key=lambda r: r["score"])
print([r["score"] for r in wrong]) # ['10', '2', '9']
# Correct — numeric order
right = sorted(records, key=lambda r: int(r["score"]))
print([r["score"] for r in right]) # ['2', '9', '10']
5.6 Assuming json.load Always Returns a List¶
A JSON file might contain a single object {} instead of an array [].
Check the type before treating it as a list of records.
import json
from pathlib import Path
def load_records(path: str | Path) -> list[dict]:
with Path(path).open(encoding="utf-8") as f:
data = json.load(f)
if isinstance(data, list):
return data
if isinstance(data, dict):
return [data] # wrap single object in a list
raise ValueError(f"Unexpected JSON structure in {path}: {type(data)}")
5.7 Building Large Strings with + in a Loop¶
When generating a report or output string, concatenating with + inside a
loop creates a new string object on every iteration. Use a list and
"".join() instead.
records = [{"name": "Alice", "score": 88}, {"name": "Bob", "score": 92}]
# Slow for large data
output = ""
for r in records:
output += f"{r['name']}: {r['score']}\n"
# Better
lines = [f"{r['name']}: {r['score']}" for r in records]
output = "\n".join(lines)
6. Practice Tasks¶
-
Write a function
load_and_clean_csv(path: str | Path) -> list[dict]that reads a CSV file, strips whitespace from all string fields, and returns the cleaned records. -
Given a list of dicts with
"name"and"score"keys (scores as strings), write a functiontop_scorers(records: list[dict], n: int) -> list[dict]that returns the topnrecords sorted by score descending. Convert scores to integers before sorting. -
Write a function
count_by(records: list[dict], field: str) -> dict[str, int]that returns a dict mapping each unique value offieldto the number of records with that value. -
Write a function
average_field(records: list[dict], field: str) -> float | Nonethat returns the average of a numeric field across all records, orNoneif there are no valid values. -
Write a function
filter_range(records: list[dict], field: str, low: float, high: float) -> list[dict]that returns records where the numeric value offieldfalls betweenlowandhigh(inclusive). -
Write a function
pivot_count(records: list[dict], row_key: str, col_key: str) -> dict[str, dict[str, int]]that builds a pivot table counting how many records have each combination ofrow_keyandcol_keyvalues. -
Write a complete script that reads
sales.csv(columns:date,product,units,revenue), computes total revenue per product, and writes the result toproduct_totals.csvsorted by revenue descending. Usepathlib, type hints, and small named functions. -
Extend the pipeline from section 4.6 to also write a JSON file
dept_summary.jsonalongside the CSV. The JSON should contain the same data as the CSV.
7. Key Takeaways¶
- Represent tabular data as a list of dicts. Each dict is one row; keys are
column names. This structure works directly with
csv.DictReader,json.load, and all the patterns in this chapter. csv.DictReaderreturns every value as a string. Always convert numeric fields tointorfloatright after loading.- Clean data immediately after loading: strip whitespace, handle empty strings, and convert types. The rest of your code can then trust the data.
- Filter with list comprehensions:
[r for r in records if condition]. - Sort with
sorted(records, key=lambda r: r["field"]). Use a tuple key to sort by multiple fields. Negate a numeric field to sort it descending. - Group with
defaultdict(list): iterate once, appending each record to the right bucket. - Aggregate with Python's built-ins:
len(),sum(),min(),max(). Average issum(values) / len(values). - Keep each step of a pipeline in its own small function. This makes the code testable, readable, and easy to extend.
- Write results back with
csv.DictWriterorjson.dump. Always passnewline=""for CSV andencoding="utf-8"for both.
Further Reading¶
What's Next¶
Ready to continue? Head to the next chapter: Practical Projects.
→ Chapter 22 — Practical Projects
See also: - Exercise - Solution - Cheatsheet