Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1""" 

2Router for Config routes 

3""" 

4 

5import subprocess 

6import tarfile 

7from fnmatch import fnmatch 

8from hashlib import sha256 

9from io import BytesIO 

10from os import DirEntry, scandir 

11from typing import Any, Dict, List, Optional, Set 

12 

13from fastapi import APIRouter, Body, File, status 

14from fastapi.datastructures import UploadFile 

15from fastapi.exceptions import HTTPException 

16from fastapi.responses import FileResponse 

17from pydantic.types import DirectoryPath, FilePath 

18from starlette.responses import Response 

19 

20from serverctl_deployd.models.config import (ConfigBucket, ListConfigBucket, 

21 UpdateCommand) 

22from serverctl_deployd.models.exceptions import GenericError 

23 

24 

25def _list_files(path: DirectoryPath, 

26 patterns: Optional[Set[str]]) -> List[DirEntry[str]]: 

27 """ 

28 Get a list of DirEntry objects for a path, 

29 to be used by other functions 

30 """ 

31 file_list: List[DirEntry[str]] = [] 

32 with scandir(path) as listing: 

33 for entry in listing: 

34 if entry.is_file(): 

35 if not patterns or not any( 

36 fnmatch(entry.name, pattern) 

37 for pattern in patterns 

38 ): 

39 file_list.append(entry) 

40 return file_list 

41 

42 

43def _get_hashes(path: DirectoryPath, 

44 patterns: Optional[Set[str]]) -> Dict[str, str]: 

45 """ 

46 Get hashes of files in a directory whose file names do not 

47 match the glob patterns 

48 """ 

49 file_hash_list: Dict[str, str] = {} 

50 for entry in _list_files(path, patterns): 

51 file_hash = sha256() 

52 byte_array = bytearray(128 * 1024) 

53 memory_view = memoryview(byte_array) 

54 with open(entry.path, 'rb', buffering=0) as conf_file: 

55 for buffer_size in iter( 

56 lambda cf=conf_file, mv=memory_view: # type: ignore 

57 cf.readinto(mv), 0 

58 ): 

59 file_hash.update(memory_view[:buffer_size]) 

60 file_hash_str = file_hash.hexdigest() 

61 file_hash_list.update({entry.name: file_hash_str}) 

62 return file_hash_list 

63 

64 

65router = APIRouter( 

66 prefix="/config/buckets", 

67 tags=["config"] 

68) 

69 

70 

71@router.post( 

72 "/", 

73 responses={ 

74 status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": GenericError} 

75 }, 

76 response_model=Dict[str, str] 

77) 

78def validate_bucket(config_bucket: ConfigBucket) -> Dict[str, str]: 

79 """ 

80 Checks if the directory path and config updation command are valid. 

81 If valid then returns a list of files and its hashes 

82 """ 

83 if config_bucket.update_command: 

84 try: 

85 subprocess.run( 

86 config_bucket.update_command, 

87 shell=True, 

88 executable="/bin/bash", 

89 check=True 

90 ) 

91 except subprocess.SubprocessError as execution_error: 

92 raise HTTPException( 

93 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

94 detail="Internal server error" 

95 ) from execution_error 

96 return _get_hashes( 

97 config_bucket.directory_path, 

98 config_bucket.ignore_patterns 

99 ) 

100 

101 

102@router.post("/files", response_model=List[str]) 

103def list_filenames(config_bucket: ListConfigBucket) -> List[str]: 

104 """Return list of file names for a config bucket""" 

105 filename_list = [ 

106 entry.name for entry in _list_files( 

107 config_bucket.directory_path, 

108 config_bucket.ignore_patterns 

109 ) 

110 ] 

111 return filename_list 

112 

113 

114@router.post("/check", response_model=Dict[str, str]) 

115def get_hashes(config_bucket: ListConfigBucket) -> Dict[str, str]: 

116 """ 

117 Return list of file names for a bucket and their sha256 checksums 

118 which will be verified by the serverctl API 

119 """ 

120 return _get_hashes( 

121 config_bucket.directory_path, 

122 config_bucket.ignore_patterns 

123 ) 

124 

125 

126@router.post( 

127 "/backup", 

128 responses={ 

129 status.HTTP_200_OK: { 

130 "content": {"application/x-tar": {}} 

131 } 

132 }, 

133 response_class=Response 

134) 

135def get_tar_archive( 

136 config_bucket: ListConfigBucket 

137) -> Response: 

138 """Returns the tar archive of config folder for backup""" 

139 file_list = _list_files( 

140 config_bucket.directory_path, 

141 config_bucket.ignore_patterns 

142 ) 

143 size = sum(entry.stat().st_size for entry in file_list) 

144 byte_array = bytearray(size) 

145 file_object = BytesIO(byte_array) 

146 with tarfile.open(mode="w:gz", fileobj=file_object) as tar_file: 

147 for entry in file_list: 

148 tar_file.add(entry.path, arcname=entry.name) 

149 return Response( 

150 content=file_object.getvalue(), 

151 media_type="application/x-tar" 

152 ) 

153 

154 

155@router.get( 

156 "/file", response_class=FileResponse, 

157 responses={ 

158 status.HTTP_200_OK: { 

159 "content": {"text/plain": {}} 

160 } 

161 } 

162) 

163async def get_file(file_path: FilePath) -> FileResponse: 

164 """Returns the requested config file""" 

165 return FileResponse(file_path) 

166 

167 

168@router.put( 

169 "/file", response_class=FileResponse, 

170 responses={ 

171 status.HTTP_200_OK: { 

172 "content": {"text/plain": {}} 

173 }, 

174 status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": GenericError} 

175 } 

176) 

177async def update_file( 

178 file_path: FilePath, 

179 update_command: str = Body(...), 

180 new_file: UploadFile = File(...) 

181) -> FileResponse: 

182 """Updates the requested config file""" 

183 new_content: Any = await new_file.read() 

184 file_path.write_bytes(new_content) 

185 try: 

186 subprocess.run( 

187 update_command, 

188 shell=True, 

189 executable="/bin/bash", 

190 check=True 

191 ) 

192 except subprocess.SubprocessError as execution_error: 

193 raise HTTPException( 

194 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

195 detail="Internal server error" 

196 ) from execution_error 

197 return FileResponse(file_path) 

198 

199 

200@router.delete( 

201 "/file", 

202 responses={ 

203 status.HTTP_500_INTERNAL_SERVER_ERROR: {"model": GenericError} 

204 }, 

205 status_code=status.HTTP_204_NO_CONTENT 

206) 

207def delete_file( 

208 file_path: FilePath, 

209 update_command: UpdateCommand 

210) -> Response: 

211 """Deletes the requested config file""" 

212 file_path.unlink() 

213 try: 

214 subprocess.run( 

215 update_command.update_command, 

216 shell=True, 

217 executable="/bin/bash", 

218 check=True 

219 ) 

220 except subprocess.SubprocessError as execution_error: 

221 raise HTTPException( 

222 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

223 detail="Internal server error" 

224 ) from execution_error 

225 return Response(status_code=status.HTTP_204_NO_CONTENT)