Selfie
Loading...
Searching...
No Matches
plugin.py
Go to the documentation of this file.
1import os
2from collections import defaultdict
3from collections.abc import ByteString, Iterator
4from typing import Optional
5
6import pytest
7from selfie_lib import (
8 FS,
9 ArrayMap,
10 ArraySet,
11 CallStack,
12 CommentTracker,
13 DiskStorage,
14 DiskWriteTracker,
15 InlineWriteTracker,
16 LiteralValue,
17 Mode,
18 Snapshot,
19 SnapshotFile,
20 SnapshotFileLayout,
21 SnapshotSystem,
22 SnapshotValueReader,
23 SourceFile,
24 TypedPath,
25 WithinTestGC,
26 _clearSelfieSystem,
27 _initSelfieSystem,
28)
29from selfie_lib.Atomic import AtomicReference
30from selfie_lib.WriteTracker import ToBeFileWriteTracker
31
32from .SelfieSettingsAPI import SelfieSettingsAPI
33
34
36 def assert_failed(self, message, expected=None, actual=None) -> Exception:
37 if expected is None and actual is None:
38 return AssertionError(message)
39
40 expected_str = self.__nullable_to_string(expected, "")
41 actual_str = self.__nullable_to_string(actual, "")
42
43 if not expected_str and not actual_str and (expected is None or actual is None):
44 on_null = "(null)"
45 return self.__comparison_assertion(
46 message,
47 self.__nullable_to_string(expected, on_null),
48 self.__nullable_to_string(actual, on_null),
49 )
50 else:
51 return self.__comparison_assertion(message, expected_str, actual_str)
52
53 def __nullable_to_string(self, value, on_null: str) -> str:
54 return str(value) if value is not None else on_null
55
56 def __comparison_assertion(
57 self, message: str, expected: str, actual: str
58 ) -> Exception:
59 # this *should* throw an exception that a good pytest runner will show nicely
60 assert expected == actual, message
61 # but in case it doesn't, we'll create our own here
62 return AssertionError(message)
63
64
66 def __init__(self, fs: FSImplementation, settings: SelfieSettingsAPI):
67 super().__init__(fs)
68 self.__settings = settings
69 self.__root_folder = TypedPath.of_folder(os.path.abspath(settings.root_dir))
71
72 def root_folder(self) -> TypedPath:
73 return self.__root_folder
74
75 def snapshotfile_for_testfile(self, testfile: TypedPath) -> TypedPath:
76 if testfile.name.endswith(".py"):
77 return testfile.parent_folder().resolve_file(f"{testfile.name[:-3]}.ss")
78 else:
79 raise ValueError(f"Unknown file extension, expected .py: {testfile.name}")
80
81 def __infer_default_line_ending_is_unix(self) -> bool:
82 def walk_callback(walk: Iterator[TypedPath]) -> bool:
83 for file_path in walk:
84 try:
85 txt = self.fs.file_read(file_path)
86 # look for a file that has a newline somewhere in it
87 if "\n" in txt:
88 return "\r" not in txt
89 except UnicodeDecodeError: # noqa: PERF203
90 # might be a binary file that throws an encoding exception
91 pass
92 return True # if we didn't find any files, assume unix
93
94 return self.fs.file_walk(self.__root_folder, walk_callback)
95
96
97@pytest.hookimpl
99 session: pytest.Session, config: pytest.Config, items: list[pytest.Item]
100) -> None:
101 settings = SelfieSettingsAPI(config)
102 system = PytestSnapshotSystem(settings)
103 session.selfie_system = system # type: ignore
104 _initSelfieSystem(system)
105 for item in items:
106 (file, _, testname) = item.reportinfo()
107 system.planning_to_run(TypedPath.of_file(os.path.abspath(file)), testname)
108
109
110@pytest.hookimpl
111def pytest_sessionfinish(session: pytest.Session, exitstatus): # noqa: ARG001
112 system: PytestSnapshotSystem = session.selfie_system # type: ignore
113 system.finished_all_tests()
114 _clearSelfieSystem(system)
115
116
117@pytest.hookimpl(hookwrapper=True)
118def pytest_runtest_protocol(item: pytest.Item, nextitem: Optional[pytest.Item]): # noqa: ARG001
119 (file, _, testname) = item.reportinfo()
120 testfile = TypedPath.of_file(os.path.abspath(file))
121
122 system: PytestSnapshotSystem = item.session.selfie_system # type: ignore
123 system.test_start(testfile, testname)
124 yield
125 system.test_finish(testfile, testname)
126
127
128@pytest.hookimpl
129def pytest_runtest_makereport(call: pytest.CallInfo[None], item: pytest.Item):
130 if call.excinfo is not None and call.when in (
131 "call",
132 "teardown",
133 ):
134 system: PytestSnapshotSystem = item.session.selfie_system # type: ignore
135 (file, _, testname) = item.reportinfo()
136 system.test_failed(TypedPath.of_file(os.path.abspath(file)), testname)
137
138
139class _keydefaultdict(defaultdict):
140 """A special defaultdict that passes the key to the default_factory."""
141
142 def __missing__(self, key):
143 if self.default_factory is None:
144 raise KeyError(key)
145 else:
146 ret = self[key] = self.default_factory(key) # type: ignore
147 return ret
148
149
151 def __init__(self, settings: SelfieSettingsAPI):
152 self.__fs = FSImplementation()
153 self.__mode = settings.calc_mode()
158
159 self.__progress_per_file: defaultdict[TypedPath, SnapshotFileProgress] = (
160 _keydefaultdict(lambda key: SnapshotFileProgress(self, key)) # type: ignore
161 ) # type: ignore
162 # the test which is running right now, if any
163 self.__in_progress: Optional[SnapshotFileProgress] = None
164 # double-checks that we don't have any tests in progress
165 self.check_for_invalid_state: AtomicReference[Optional[ArraySet[TypedPath]]] = (
166 AtomicReference(ArraySet.empty())
167 )
168
169 def planning_to_run(self, testfile: TypedPath, testname: str): # noqa: ARG002
170 progress = self.__progress_per_file[testfile]
171 progress.finishes_expected += 1
172
173 def mark_path_as_written(self, path: TypedPath):
174 def update_fun(arg: Optional[ArraySet[TypedPath]]):
175 if arg is None:
176 raise RuntimeError(
177 "Snapshot file is being written after all tests were finished."
178 )
179 return arg.plusOrThis(path)
180
181 self.check_for_invalid_state.update_and_get(update_fun)
182
183 def test_start(self, testfile: TypedPath, testname: str):
184 if self.__in_progress:
185 raise RuntimeError(
186 f"Test already in progress. {self.__in_progress.test_file} is running, can't start {testfile}"
187 )
188 self.__in_progress = self.__progress_per_file[testfile]
189 self.__in_progress.test_start(testname)
190
191 def test_failed(self, testfile: TypedPath, testname: str):
192 self.__assert_inprogress(testfile)
193 self.__in_progress.test_failed(testname) # type: ignore
194
195 def test_finish(self, testfile: TypedPath, testname: str):
196 self.__assert_inprogress(testfile)
197 self.__in_progress.test_finish(testname) # type: ignore
198 self.__in_progress = None
199
200 def __assert_inprogress(self, testfile: TypedPath):
201 if self.__in_progress is None:
202 raise RuntimeError("No test in progress")
203 if self.__in_progress.test_file != testfile:
204 raise RuntimeError(
205 f"{self.__in_progress.test_file} is in progress, can't accept data for {testfile}."
206 )
207
209 snapshotsFilesWrittenToDisk = self.check_for_invalid_state.get_and_update(
210 lambda _: None
211 )
212 if snapshotsFilesWrittenToDisk is None:
213 raise RuntimeError("finished_all_tests() was called more than once.")
214
215 if self.modemode != Mode.readonly:
216 if self.__inline_write_tracker.hasWrites():
218
219 for path in self.__comment_tracker.paths_with_once():
220 source = SourceFile(path.name, self.fsfs.file_read(path))
221 source.remove_selfie_once_comments()
222 self.fsfs.file_write(path, source.as_string)
223
224 @property
225 def mode(self) -> Mode:
226 return self.__mode
227
228 @property
229 def fs(self) -> FS:
230 return self.__fs
231
232 @property
233 def layout(self) -> SnapshotFileLayout:
234 return self.layout_pytest
235
236 def disk_thread_local(self) -> DiskStorage:
237 if (
238 self.__in_progress is None
239 or self.__in_progress.testname_in_progress is None
240 ):
241 raise RuntimeError("No test in progress")
242 return DiskStoragePytest(
243 self.__in_progress, self.__in_progress.testname_in_progress
244 )
245
246 def source_file_has_writable_comment(self, call: CallStack) -> bool:
247 return self.__comment_tracker.hasWritableComment(call, self.layoutlayoutlayout)
248
249 def write_inline(self, literal_value: LiteralValue, call: CallStack):
250 self.__inline_write_tracker.record(literal_value, call, self.layoutlayoutlayout)
251
253 self, path: TypedPath, data: "ByteString", call: CallStack
254 ) -> None:
255 # Directly write to disk using ToBeFileWriteTracker
256 self.__toBeFileWriteTracker.writeToDisk(
257 path, bytes(data), call, self.layout_pytest
258 )
259
260
262 def __init__(self, progress: "SnapshotFileProgress", testname: str):
263 self.__progress = progress
264 self._testname = testname
265
266 def read_disk(self, sub: str, call: "CallStack") -> Optional["Snapshot"]: # noqa: ARG002
267 return self.__progress.read(self._testname, self._suffix(sub))
268
269 def write_disk(self, actual: "Snapshot", sub: str, call: "CallStack"):
270 self.__progress.write(
271 self._testname,
272 self._suffix(sub),
273 actual,
274 call,
275 self.__progress.system.layout,
276 )
277
278 def keep(self, sub_or_keep_all: Optional[str]):
279 self.__progress.keep(
280 self._testname, self._suffix(sub_or_keep_all) if sub_or_keep_all else None
281 )
282
283 def _suffix(self, sub: str) -> str:
284 return f"/{sub}" if sub else ""
285
286
288 TERMINATED = ArrayMap.empty().plus(" ~ / f!n1shed / ~ ", WithinTestGC())
289
290 def __init__(self, system: PytestSnapshotSystem, test_file: TypedPath):
291 self.system = system
292 # the test file which holds the test case which we are the snapshot file for
293 self.test_file = test_file
294
295 # before the tests run, we find out how many we expect to happen
297 # while the tests run, we count up until they have all run, and then we can cleanup
299 # have any tests failed?
300 self.has_failed = False
301
302 # lazy-loaded snapshot file
303 self.file: Optional[SnapshotFile] = None
304 self.tests: AtomicReference[ArrayMap[str, WithinTestGC]] = AtomicReference(
305 ArrayMap.empty()
306 )
307 self.disk_write_tracker: Optional[DiskWriteTracker] = DiskWriteTracker()
308 # the test name which is currently in progress, if any
309 self.testname_in_progress: Optional[str] = None
311
313 if self.tests.get() == SnapshotFileProgress.TERMINATED:
314 raise RuntimeError(
315 "Cannot call methods on a terminated SnapshotFileProgress"
316 )
317
318 def test_start(self, testname: str):
319 if "/" in testname:
320 raise ValueError(f"Test name cannot contain '/', was {testname}")
322 if self.testname_in_progress is not None:
323 raise RuntimeError(
324 f"Cannot start a new test {testname}, {self.testname_in_progress} is already in progress"
325 )
326 self.testname_in_progress = testname
327 self.tests.update_and_get(lambda it: it.plus_or_noop(testname, WithinTestGC()))
328
329 def test_failed(self, testname: str):
330 self.__assert_in_progress(testname)
331 self.has_failed = True
332 self.tests.get()[testname].keep_all()
333
334 def test_finish(self, testname: str):
335 self.__assert_in_progress(testname)
336 self.finishes_so_far += 1
337 self.testname_in_progress = None
338 if self.finishes_so_far == self.finishes_expected:
340
341 def __assert_in_progress(self, testname: str):
343 if self.testname_in_progress is None:
344 raise RuntimeError("Can't finish, no test was in progress!")
345 if self.testname_in_progress != testname:
346 raise RuntimeError(
347 f"Can't finish {testname}, {self.testname_in_progress} was in progress"
348 )
349
350 def __all_tests_finished(self):
352 self.disk_write_tracker = None # don't need this anymore
353 tests = self.tests.get_and_update(lambda _: SnapshotFileProgress.TERMINATED)
354 if tests == SnapshotFileProgress.TERMINATED:
355 raise ValueError(f"Snapshot for {self.test_file} already terminated!")
356 if self.file is not None:
357 stale_snapshot_indices = []
358 # TODO: figure out GC # noqa: TD002, FIX002, TD003
359 # stale_snapshot_indices = WithinTestGC.find_stale_snapshots_within(self.file.snapshots, tests, find_test_methods_that_didnt_run(self.test_file, tests)) # noqa: ERA001
360 if stale_snapshot_indices or self.file.was_set_at_test_time:
361 self.file.remove_all_indices(stale_snapshot_indices)
362 snapshot_path = self.system.layout_pytest.snapshotfile_for_testfile(
363 self.test_file
364 )
365 if not self.file.snapshots:
367 else:
368 self.system.mark_path_as_written(
369 self.system.layout_pytest.snapshotfile_for_testfile(
370 self.test_file
371 )
372 )
373 os.makedirs(
374 os.path.dirname(snapshot_path.absolute_path), exist_ok=True
375 )
376 with open(
377 snapshot_path.absolute_path, "w", encoding="utf-8"
378 ) as writer:
379 filecontent = []
380 self.file.serialize(filecontent)
381 for line in filecontent:
382 writer.write(line)
383 else:
384 # we never read or wrote to the file
385 every_test_in_class_ran = not any(
387 )
388 is_stale = (
389 every_test_in_class_ran
390 and not self.has_failed
391 and all(it.succeeded_and_used_no_snapshots() for it in tests.values())
392 )
393 if is_stale:
394 snapshot_file = self.system.layout_pytest.snapshotfile_for_testfile(
395 self.test_file
396 )
398 # now that we are done, allow our contents to be GC'ed
399 self.file = None
400
401 def keep(self, test: str, suffix_or_all: Optional[str]):
403 if suffix_or_all is None:
404 self.tests.get()[test].keep_all()
405 else:
406 self.tests.get()[test].keep_suffix(suffix_or_all)
407
408 def write(
409 self,
410 test: str,
411 suffix: str,
412 snapshot: Snapshot,
413 call_stack: CallStack,
414 layout: SnapshotFileLayout,
415 ):
417 key = f"{test}{suffix}"
418 self.disk_write_tracker.record(key, snapshot, call_stack, layout) # type: ignore
419 self.tests.get()[test].keep_suffix(suffix)
420 self.read_file().set_at_test_time(key, snapshot)
421
422 def read(self, test: str, suffix: str) -> Optional[Snapshot]:
424 snapshot = self.read_file().snapshots.get(f"{test}{suffix}")
425 if snapshot is not None:
426 self.tests.get()[test].keep_suffix(suffix)
427 return snapshot
428
429 def read_file(self) -> SnapshotFile:
430 if self.file is None:
431 snapshot_path = self.system.layout_pytest.snapshotfile_for_testfile(
432 self.test_file
433 )
434 if os.path.exists(snapshot_path.absolute_path) and os.path.isfile(
435 snapshot_path.absolute_path
436 ):
437 with open(snapshot_path.absolute_path, "rb") as f:
438 content = f.read()
439 self.file = SnapshotFile.parse(SnapshotValueReader.of_binary(content))
440 else:
441 self.file = SnapshotFile.create_empty_with_unix_newlines(
442 self.system.layout_pytest.unix_newlines
443 )
444 return self.file
445
446
447def delete_file_and_parent_dir_if_empty(snapshot_file: TypedPath):
448 if os.path.isfile(snapshot_file.absolute_path):
449 os.remove(snapshot_file.absolute_path)
450 # if the parent folder is now empty, delete it
451 parent = os.path.dirname(snapshot_file.absolute_path)
452 if not os.listdir(parent):
453 os.rmdir(parent)
454
455
457 testfile: TypedPath, # noqa: ARG001
458 tests: ArrayMap[str, WithinTestGC], # noqa: ARG001
459) -> ArrayMap[str, WithinTestGC]:
460 # Implementation of finding test methods that didn't run
461 # You can replace this with your own logic based on the class_name and tests dictionary
462 return ArrayMap.empty()
463
464
466 group = parser.getgroup("selfie")
467 group.addoption(
468 "--foo",
469 action="store",
470 dest="dest_foo",
471 default="2024",
472 help='Set the value for the fixture "bar".',
473 )
474
475 parser.addini("HELLO", "Dummy pytest.ini setting")
476
477
478@pytest.fixture
479def bar(request):
480 return request.config.option.dest_foo
keep(self, Optional[str] sub_or_keep_all)
Definition plugin.py:278
write_disk(self, "Snapshot" actual, str sub, "CallStack" call)
Definition plugin.py:269
__init__(self, "SnapshotFileProgress" progress, str testname)
Definition plugin.py:262
Optional["Snapshot"] read_disk(self, str sub, "CallStack" call)
Definition plugin.py:266
Exception __comparison_assertion(self, str message, str expected, str actual)
Definition plugin.py:58
str __nullable_to_string(self, value, str on_null)
Definition plugin.py:53
Exception assert_failed(self, message, expected=None, actual=None)
Definition plugin.py:36
__init__(self, FSImplementation fs, SelfieSettingsAPI settings)
Definition plugin.py:66
TypedPath snapshotfile_for_testfile(self, TypedPath testfile)
Definition plugin.py:75
mark_path_as_written(self, TypedPath path)
Definition plugin.py:173
bool source_file_has_writable_comment(self, CallStack call)
Definition plugin.py:246
SnapshotFileLayout layout(self)
Definition plugin.py:233
test_finish(self, TypedPath testfile, str testname)
Definition plugin.py:195
None write_to_be_file(self, TypedPath path, "ByteString" data, CallStack call)
Definition plugin.py:254
planning_to_run(self, TypedPath testfile, str testname)
Definition plugin.py:169
__init__(self, SelfieSettingsAPI settings)
Definition plugin.py:151
write_inline(self, LiteralValue literal_value, CallStack call)
Definition plugin.py:249
test_start(self, TypedPath testfile, str testname)
Definition plugin.py:183
__assert_inprogress(self, TypedPath testfile)
Definition plugin.py:200
test_failed(self, TypedPath testfile, str testname)
Definition plugin.py:191
write(self, str test, str suffix, Snapshot snapshot, CallStack call_stack, SnapshotFileLayout layout)
Definition plugin.py:415
keep(self, str test, Optional[str] suffix_or_all)
Definition plugin.py:401
__init__(self, PytestSnapshotSystem system, TypedPath test_file)
Definition plugin.py:290
Optional[Snapshot] read(self, str test, str suffix)
Definition plugin.py:422
__assert_in_progress(self, str testname)
Definition plugin.py:341
pytest_addoption(parser)
Definition plugin.py:465
ArrayMap[str, WithinTestGC] find_test_methods_that_didnt_run(TypedPath testfile, ArrayMap[str, WithinTestGC] tests)
Definition plugin.py:459
pytest_runtest_makereport(pytest.CallInfo[None] call, pytest.Item item)
Definition plugin.py:129
delete_file_and_parent_dir_if_empty(TypedPath snapshot_file)
Definition plugin.py:447
pytest_sessionfinish(pytest.Session session, exitstatus)
Definition plugin.py:111
pytest_runtest_protocol(pytest.Item item, Optional[pytest.Item] nextitem)
Definition plugin.py:118
None pytest_collection_modifyitems(pytest.Session session, pytest.Config config, list[pytest.Item] items)
Definition plugin.py:100