14 FLAG_BASE64 =
" ═╗ base64"
15 name_esc = PerCharacterEscaper.specified_escape(
"\\\\[(])\nn\tt╔┌╗┐═─")
16 body_esc = PerCharacterEscaper.self_escape(
"\ud801\udf43\ud801\udf41")
20 self.
line: Optional[str] =
None
29 nextLineCheckForBase64: Optional[str] = self.
__next_line()
30 if nextLineCheckForBase64
is None:
32 is_base64: bool = self.
FLAG_BASE64 in nextLineCheckForBase64
36 buffer: list[str] = []
38 def consumer(line: str) ->
None:
40 if len(line) >= 2
and ord(line[0]) == 0xD801
and ord(line[1]) == 0xDF41:
42 buffer.append(line[2:])
49 raw_string: str =
"" if buffer.__len__() == 0
else (
"".join(buffer))[:-1]
53 decoded_bytes: bytes = base64.b64decode(raw_string)
54 return SnapshotValue.of(decoded_bytes)
56 return SnapshotValue.of(self.
body_esc.unescape(raw_string))
63 def __scan_value(self, consumer: Callable[[str],
None]) ->
None:
67 and nextLine.find(SnapshotValueReader.KEY_FIRST_CHAR) != 0
73 def __next_key(self) -> Optional[str]:
77 start_index: int = line.find(self.
KEY_START)
78 end_index: int = line.find(self.
KEY_END)
81 self.
line_reader, f
"Expected to start with '{self.KEY_START}'"
85 self.
line_reader, f
"Expected to contain '{self.KEY_END}'"
87 key: str = line[start_index + len(self.
KEY_START) : end_index]
88 if key.startswith(
" ")
or key.endswith(
" "):
89 space_type =
"Leading" if key.startswith(
" ")
else "Trailing"
91 self.
line_reader, f
"{space_type} spaces are disallowed: '{key}'"
95 def __next_line(self) -> Optional[str]:
100 def __reset_line(self) -> None:
104 def of(cls, content: str) ->
"SnapshotValueReader":
105 return cls(LineReader.for_string(content))
108 def of_binary(cls, content: bytes) ->
"SnapshotValueReader":
109 return cls(LineReader.for_binary(content))