27 staleSwapHeaderSize(0),
28 staleSplicingPointNext(-1)
42 if (update.stale || update.fresh)
43 store->map->abortUpdating(update);
74 reader = store->openStoreIO(
78 readMore(
"need swap entry metadata");
90 debugs(47, 5,
"stale chain ends at " << update.stale.splicingPoint <<
91 " body continues at " << staleSplicingPointNext);
100 IoCbParams io(buf, result);
117 bytesRead += result.
size;
118 readerBuffer.rawAppendFinish(result.
buf, result.
size);
119 exchangeBuffer.append(readerBuffer);
120 debugs(47, 7,
"accumulated " << exchangeBuffer.length());
129 debugs(47, 7,
"from " << bytesRead <<
" because " << why);
131 readerBuffer.clear();
133 readerBuffer.rawAppendStart(store->slotSize),
154 debugs(47, 5, errflag <<
" writer=" << writer);
161 mustStop(
"read error");
168 writer = store->createUpdateIO(
178 uint64_t stalePrefixSz = 0;
179 uint64_t freshPrefixSz = 0;
183 const auto &mem = update.entry->mem();
186 debugs(20, 7,
"fresh store meta for " << *update.entry);
187 size_t freshSwapHeaderSize = 0;
194 const auto savedEntrySwapFileSize = update.entry->swap_file_sz;
195 update.entry->swap_file_sz = 0;
196 const auto freshSwapHeader = update.entry->getSerialisedMetaData(freshSwapHeaderSize);
197 update.entry->swap_file_sz = savedEntrySwapFileSize;
199 Must(freshSwapHeader);
200 writer->write(freshSwapHeader, freshSwapHeaderSize, 0,
nullptr);
201 stalePrefixSz += mem.swap_hdr_sz;
202 freshPrefixSz += freshSwapHeaderSize;
203 offset += freshSwapHeaderSize;
204 xfree(freshSwapHeader);
208 debugs(20, 7,
"fresh HTTP header @ " << offset);
209 const auto httpHeader = mem.freshestReply().pack();
210 writer->write(httpHeader->content(), httpHeader->contentSize(), -1,
nullptr);
211 const auto &staleReply = mem.baseReply();
212 Must(staleReply.hdr_sz >= 0);
213 Must(staleReply.hdr_sz > 0);
214 stalePrefixSz += staleReply.hdr_sz;
215 freshPrefixSz += httpHeader->contentSize();
216 offset += httpHeader->contentSize();
221 debugs(20, 7,
"moved HTTP body prefix @ " << offset);
222 writer->write(exchangeBuffer.rawContent(), exchangeBuffer.length(), -1,
nullptr);
223 offset += exchangeBuffer.length();
224 exchangeBuffer.clear();
227 debugs(20, 7,
"wrote " << offset <<
228 "; swap_file_sz delta: -" << stalePrefixSz <<
" +" << freshPrefixSz);
231 auto &swap_file_sz = update.fresh.anchor->basics.swap_file_sz;
232 Must(swap_file_sz >= stalePrefixSz);
233 swap_file_sz -= stalePrefixSz;
234 swap_file_sz += freshPrefixSz;
252 debugs(47, 5, errflag <<
" reader=" << reader);
259 debugs(47, 5,
"fresh chain ends at " << update.fresh.splicingPoint);
260 store->map->closeForUpdating(update);
270 if (!staleSwapHeaderSize) {
273 debugs(47, 7,
"staleSwapHeaderSize=" << staleSwapHeaderSize);
274 Must(staleSwapHeaderSize > 0);
275 exchangeBuffer.consume(staleSwapHeaderSize);
278 const size_t staleHttpHeaderSize =
headersEnd(
279 exchangeBuffer.rawContent(),
280 exchangeBuffer.length());
281 debugs(47, 7,
"staleHttpHeaderSize=" << staleHttpHeaderSize);
282 if (!staleHttpHeaderSize) {
283 readMore(
"need more stale HTTP reply header data");
287 exchangeBuffer.consume(staleHttpHeaderSize);
288 debugs(47, 7,
"httpBodySizePrefix=" << exchangeBuffer.length());
290 stopReading(
"read the last HTTP header slot");
#define CallJobHere1(debugSection, debugLevel, job, Class, method, arg1)
virtual bool doneAll() const
whether positive goal has been reached
Aggregates information required for updating entry metadata and headers.
SlotId splicingPoint
the last db slot successfully read or written
Ipc::StoreMapAnchor * writeableAnchor_
starting point for writing
SlotId staleSplicingPointNext
@ wroteAll
success: caller supplied all data it wanted to swap out
@ readerDone
success or failure: either way, stop swapping in
@ writerGone
failure: caller left before swapping out everything
#define debugs(SECTION, LEVEL, CONTENT)
size_t UnpackSwapMetaSize(const SBuf &)
void storeRead(StoreIOState::Pointer sio, char *buf, size_t size, off_t offset, StoreIOState::STRCB *callback, void *callback_data)