feat: add write support for KeePass entries and groups

This commit is contained in:
2026-05-10 00:19:42 +02:00
parent 89ba04d61a
commit da0b396bf8
7 changed files with 221 additions and 39 deletions
+6 -5
View File
@@ -1,7 +1,7 @@
# Project # Project
## Goal ## Goal
Provide a small read-only TypeScript wrapper around KeePass `.kdbx` databases using a Python bridge powered by `pykeepass`. Provide a TypeScript wrapper around KeePass `.kdbx` databases using a Python bridge powered by `pykeepass`.
## Architecture ## Architecture
- Public API is TypeScript. - Public API is TypeScript.
@@ -15,8 +15,10 @@ Provide a small read-only TypeScript wrapper around KeePass `.kdbx` databases us
- `KeePassDatabase.listEntries()` - `KeePassDatabase.listEntries()`
- `KeePassDatabase.findEntries(query)` - `KeePassDatabase.findEntries(query)`
- `KeePassDatabase.listGroups()` - `KeePassDatabase.listGroups()`
- `KeePassDatabase.createEntry(entry)`
- `KeePassDatabase.createGroup(group)`
- `KeePassDatabase.save()`
- `KeePassDatabase.close()` is a no-op. - `KeePassDatabase.close()` is a no-op.
## Types ## Types
- Entries expose: `title`, `username`, `password`, `url`, `notes`, optional `groupPath`, optional `otp`. - Entries expose: `title`, `username`, `password`, `url`, `notes`, optional `groupPath`, optional `otp`.
- Groups expose: `name`, `path`. - Groups expose: `name`, `path`.
@@ -33,7 +35,7 @@ Provide a small read-only TypeScript wrapper around KeePass `.kdbx` databases us
- Bundled fixtures: `tests/fixtures/data.kdbx` and `tests/fixtures/empty.kdbx`. - Bundled fixtures: `tests/fixtures/data.kdbx` and `tests/fixtures/empty.kdbx`.
- Companion JSON fixture: `tests/fixtures/data.kdbx.json` stores the password and expected content. - Companion JSON fixture: `tests/fixtures/data.kdbx.json` stores the password and expected content.
- Unit tests in `tests/unit/` mock the child process and validate bridge parsing/error handling. - Unit tests in `tests/unit/` mock the child process and validate bridge parsing/error handling.
- Integration tests in `tests/integration/` use `data.kdbx` to verify entries, groups, partial search, and OTP/TOTP output when `pykeepass` is installed. - Integration tests in `tests/integration/` use `data.kdbx` to verify entries, groups, partial search, OTP/TOTP output, and basic write persistence on a temporary copy when `pykeepass` is installed.
- The integration test runner checks for `pykeepass` and skips cleanly when it is unavailable. - The integration test runner checks for `pykeepass` and skips cleanly when it is unavailable.
## Main scripts ## Main scripts
@@ -43,5 +45,4 @@ Provide a small read-only TypeScript wrapper around KeePass `.kdbx` databases us
- `bun run setup:python` - `bun run setup:python`
## Current direction ## Current direction
Keep improving failure-path coverage and keep the API minimal unless a concrete need appears. Keep improving failure-path coverage, keep write support minimal and predictable, and continue validating persistence on temporary copies.
+24 -2
View File
@@ -1,13 +1,13 @@
# kdbx-lib # kdbx-lib
TypeScript wrapper around `pykeepass` for reading KeePass `.kdbx` files. TypeScript wrapper around `pykeepass` for reading and modifying KeePass `.kdbx` files.
## Overview ## Overview
This project uses: This project uses:
- TypeScript as the public API - TypeScript as the public API
- Python as the runtime backend - Python as the runtime backend
- `pykeepass` to read KeePass databases - `pykeepass` to read and update KeePass databases
The TypeScript layer launches a Python bridge per request and exchanges JSON through stdin/stdout. The TypeScript layer launches a Python bridge per request and exchanges JSON through stdin/stdout.
@@ -74,6 +74,27 @@ Finds entries by partial match and returns the full entry objects.
### `listGroups()` ### `listGroups()`
Returns all groups, including their names and paths. Returns all groups, including their names and paths.
### `createEntry(entry)`
Creates a new entry in the target database and persists it immediately.
#### Entry input
- `title`: entry title
- `username`: optional username
- `password`: optional password
- `url`: optional URL
- `notes`: optional notes
- `groupPath`: optional target group path
### `createGroup(group)`
Creates a new group and persists it immediately.
#### Group input
- `name`: group name
- `path`: optional parent group path
### `save()`
Persists the current database state.
### `close()` ### `close()`
No-op for now. No-op for now.
@@ -83,5 +104,6 @@ No-op for now.
- This is simple and robust for a first version. - This is simple and robust for a first version.
- Errors from the Python bridge are propagated to the TypeScript API, including invalid or empty output. - Errors from the Python bridge are propagated to the TypeScript API, including invalid or empty output.
- A persistent Python process can be added later if needed. - A persistent Python process can be added later if needed.
- Write operations currently open, modify, and save the database per command.
- Bundled fixtures include `tests/fixtures/data.kdbx` and `tests/fixtures/empty.kdbx`; the companion JSON file stores the password and expected content for tests/examples. - Bundled fixtures include `tests/fixtures/data.kdbx` and `tests/fixtures/empty.kdbx`; the companion JSON file stores the password and expected content for tests/examples.
- Integration tests validate the bundled `data.kdbx` entry-by-entry and group-by-group against `tests/fixtures/data.kdbx.json`, and may skip cleanly when `pykeepass` is unavailable. - Integration tests validate the bundled `data.kdbx` entry-by-entry and group-by-group against `tests/fixtures/data.kdbx.json`, and may skip cleanly when `pykeepass` is unavailable.
+18 -5
View File
@@ -3,18 +3,20 @@ import { fileURLToPath } from "node:url";
import type { import type {
KeePassCommand, KeePassCommand,
KeePassEntry, KeePassEntry,
KeePassEntryInput,
KeePassFindQuery, KeePassFindQuery,
KeePassGroup, KeePassGroup,
KeePassGroupInput,
KeePassOpenOptions, KeePassOpenOptions,
KeePassResponse, KeePassResponse,
} from "./types"; } from "./types";
export class KeePassDatabase { export class KeePassDatabase {
constructor( constructor(
private readonly path: string, private path: string,
private readonly options: KeePassOpenOptions, private options: KeePassOpenOptions,
private readonly pythonPath = process.env.PYTHON_PATH ?? ".venv/bin/python3", private pythonPath = process.env.PYTHON_PATH ?? ".venv/bin/python3",
private readonly bridgePath = new URL("./python/bridge.py", import.meta.url) private bridgePath = new URL("./python/bridge.py", import.meta.url)
) {} ) {}
async listEntries(): Promise<KeePassEntry[]> { async listEntries(): Promise<KeePassEntry[]> {
@@ -32,8 +34,19 @@ export class KeePassDatabase {
return response; return response;
} }
async createEntry(entry: KeePassEntryInput): Promise<KeePassEntry> {
return this.run<KeePassEntry>({ command: "create-entry", entry });
}
async createGroup(group: KeePassGroupInput): Promise<KeePassGroup> {
return this.run<KeePassGroup>({ command: "create-group", group });
}
async save(): Promise<void> {
await this.run<void>({ command: "save" });
}
async close(): Promise<void> { async close(): Promise<void> {
// No persistent process is kept alive yet.
return; return;
} }
+70
View File
@@ -60,6 +60,40 @@ def load_db(db_path, password, key_file=None):
return PyKeePass(db_path, password=password) return PyKeePass(db_path, password=password)
def save_db(db):
db.save()
def normalize_path(value):
if value is None:
return ""
return str(value).strip()
def resolve_group_by_path(db, group_path):
normalized = normalize_path(group_path)
if not normalized:
return db.root_group
candidate_paths = [normalized]
if normalized.startswith("/"):
candidate_paths.append(normalized.lstrip("/"))
else:
candidate_paths.append(f"/{normalized}")
for candidate in candidate_paths:
matching_groups = db.find_groups(path=candidate)
if matching_groups:
return matching_groups[0]
matching_groups = db.find_groups(name=normalized.split("/")[-1])
for group in matching_groups:
if path_to_string(group.path).endswith(normalized):
return group
raise ValueError(f"Group not found: {normalized}")
def read_payload(): def read_payload():
if len(sys.argv) > 1 and sys.argv[1].strip(): if len(sys.argv) > 1 and sys.argv[1].strip():
return json.loads(sys.argv[1]) return json.loads(sys.argv[1])
@@ -123,6 +157,42 @@ def main():
emit({"ok": True, "data": groups}) emit({"ok": True, "data": groups})
return 0 return 0
if command == "create-entry":
entry_input = payload.get("entry", {})
title = normalize_path(entry_input.get("title"))
if not title:
emit({"ok": False, "error": "Missing entry title"})
return 1
group = resolve_group_by_path(db, entry_input.get("groupPath"))
created = db.add_entry(
group,
title,
normalize_path(entry_input.get("username")),
normalize_path(entry_input.get("password")),
normalize_path(entry_input.get("url")),
normalize_path(entry_input.get("notes")),
)
save_db(db)
emit({"ok": True, "data": entry_to_dict(created)})
return 0
if command == "create-group":
group_input = payload.get("group", {})
name = normalize_path(group_input.get("name"))
if not name:
emit({"ok": False, "error": "Missing group name"})
return 1
parent = resolve_group_by_path(db, group_input.get("path"))
created_group = db.add_group(parent, name)
save_db(db)
emit({"ok": True, "data": group_to_dict(created_group)})
return 0
if command == "save":
save_db(db)
emit({"ok": True, "data": None})
return 0
emit({"ok": False, "error": f"Unknown command: {command}"}) emit({"ok": False, "error": f"Unknown command: {command}"})
return 1 return 1
+18 -1
View File
@@ -25,10 +25,27 @@ export type KeePassFindQuery = {
groupPath?: string; groupPath?: string;
}; };
export type KeePassEntryInput = {
title: string;
username?: string;
password?: string;
url?: string;
notes?: string;
groupPath?: string;
};
export type KeePassGroupInput = {
name: string;
path?: string;
};
export type KeePassCommand = export type KeePassCommand =
| { command: "list-entries" } | { command: "list-entries" }
| { command: "find-entries"; query: KeePassFindQuery } | { command: "find-entries"; query: KeePassFindQuery }
| { command: "list-groups" }; | { command: "list-groups" }
| { command: "create-entry"; entry: KeePassEntryInput }
| { command: "create-group"; group: KeePassGroupInput }
| { command: "save" };
export type KeePassResponse<T> = export type KeePassResponse<T> =
| { | {
+42 -1
View File
@@ -1,5 +1,8 @@
import { expect, test } from "bun:test"; import { expect, test } from "bun:test";
import { readFile } from "node:fs/promises"; import { copyFile, readFile, rm } from "node:fs/promises";
import { tmpdir } from "node:os";
import { join } from "node:path";
import { randomUUID } from "node:crypto";
import { openKeePassDatabase } from "../../src/keepass"; import { openKeePassDatabase } from "../../src/keepass";
type FixtureEntry = { type FixtureEntry = {
@@ -25,6 +28,16 @@ type FixtureData = {
const FIXTURE_PATH = "tests/fixtures/data.kdbx"; const FIXTURE_PATH = "tests/fixtures/data.kdbx";
const FIXTURE_DATA_PATH = "tests/fixtures/data.kdbx.json"; const FIXTURE_DATA_PATH = "tests/fixtures/data.kdbx.json";
async function withTempCopy<T>(filePath: string, fn: (tempPath: string) => Promise<T>): Promise<T> {
const tempPath = join(tmpdir(), `kdbx-lib-${randomUUID()}.kdbx`);
await copyFile(filePath, tempPath);
try {
return await fn(tempPath);
} finally {
await rm(tempPath, { force: true });
}
}
async function ensurePyKeePass(): Promise<boolean> { async function ensurePyKeePass(): Promise<boolean> {
const python = process.env.PYTHON_PATH ?? ".venv/bin/python3"; const python = process.env.PYTHON_PATH ?? ".venv/bin/python3";
const child = Bun.spawn([python, "-c", "import pykeepass; print('ok')"], { const child = Bun.spawn([python, "-c", "import pykeepass; print('ok')"], {
@@ -125,6 +138,34 @@ test("finds entries in the bundled data fixture", async () => {
expect(entries[0]?.groupPath).toBe("Folder1"); expect(entries[0]?.groupPath).toBe("Folder1");
}); });
test("creates entries in a temporary copy of the bundled fixture and persists them", async () => {
const [{ password }, pykeepassReady] = await Promise.all([
readFile(FIXTURE_DATA_PATH, "utf8").then((raw) => JSON.parse(raw) as FixtureData),
ensurePyKeePass(),
]);
if (!pykeepassReady) {
console.log("Skipping integration test: pykeepass is not installed");
return;
}
await withTempCopy(FIXTURE_PATH, async (tempPath) => {
const db = openKeePassDatabase(tempPath, { password });
const createdEntry = await db.createEntry({
title: "TempEntry",
username: "temp-user",
password: "temp-pass",
});
expect(createdEntry.title).toBe("TempEntry");
expect(createdEntry.username).toBe("temp-user");
const persisted = await db.findEntries({ title: "TempEntry" });
expect(persisted).toHaveLength(1);
expect(persisted[0]?.username).toBe("temp-user");
});
});
test("uses the JSON fixture content as the source of truth for expectations", async () => { test("uses the JSON fixture content as the source of truth for expectations", async () => {
const { content } = JSON.parse(await readFile(FIXTURE_DATA_PATH, "utf8")) as FixtureData; const { content } = JSON.parse(await readFile(FIXTURE_DATA_PATH, "utf8")) as FixtureData;
+43 -25
View File
@@ -9,33 +9,23 @@ mock.module("node:child_process", () => ({
spawn: spawnMock, spawn: spawnMock,
})); }));
function mockSuccessfulBridgeResponse(data: unknown) {
spawnMock.mockImplementation(() => {
const child = {
stdin: { write: () => undefined, end: () => undefined },
stdout: { on: (_event: string, cb: (chunk: Buffer | string) => void) => cb(JSON.stringify({ ok: true, data })) },
stderr: { on: () => undefined },
on: (event: string, cb: (code?: number | null) => void) => {
if (event === "close") queueMicrotask(() => cb(0));
},
};
return child as never;
});
}
describe("KeePassDatabase", () => { describe("KeePassDatabase", () => {
test("listEntries parses successful bridge response", async () => { test("listEntries parses successful bridge response", async () => {
spawnMock.mockImplementation(() => { mockSuccessfulBridgeResponse([{ title: "Entry" }]);
const listeners: Record<string, ((chunk: Buffer | string) => void)[]> = {};
const child = {
stdin: {
write: () => undefined,
end: () => {
listeners.close?.forEach((cb) => cb(Buffer.from("")));
},
},
stdout: {
on: (event: string, cb: (chunk: Buffer | string) => void) => {
listeners[event] ??= [];
listeners[event].push(cb);
if (event === "data") {
cb(JSON.stringify({ ok: true, data: [{ title: "Entry" }] }));
}
},
},
stderr: { on: () => undefined },
on: (event: string, cb: (code?: number | null) => void) => {
if (event === "close") queueMicrotask(() => cb(0));
},
};
return child as never;
});
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py")); const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
const entries = await db.listEntries(); const entries = await db.listEntries();
@@ -60,4 +50,32 @@ describe("KeePassDatabase", () => {
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py")); const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
await expect(db.listEntries()).rejects.toThrow("boom"); await expect(db.listEntries()).rejects.toThrow("boom");
}); });
test("createEntry forwards the create-entry command", async () => {
mockSuccessfulBridgeResponse({ title: "New" });
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
const created = await db.createEntry({ title: "New" });
expect(created).toEqual({ title: "New" });
expect(spawnMock).toHaveBeenCalled();
});
test("createGroup forwards the create-group command", async () => {
mockSuccessfulBridgeResponse({ name: "Folder", path: "" });
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
const created = await db.createGroup({ name: "Folder" });
expect(created).toEqual({ name: "Folder", path: "" });
expect(spawnMock).toHaveBeenCalled();
});
test("save forwards the save command", async () => {
mockSuccessfulBridgeResponse(null);
const db = new KeePassDatabase("db.kdbx", { password: "secret" }, "python3", new URL("file:///tmp/bridge.py"));
await expect(db.save()).resolves.toBeUndefined();
expect(spawnMock).toHaveBeenCalled();
});
}); });