Queue.cc
Go to the documentation of this file.
1/*
2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9/* DEBUG: section 54 Interprocess Communication */
10
11#include "squid.h"
12#include "base/TextException.h"
13#include "debug/Stream.h"
14#include "globals.h"
15#include "ipc/Queue.h"
16
17#include <limits>
18
20static String
22{
23 id.append("__metadata");
24 return id;
25}
26
28static String
30{
31 id.append("__queues");
32 return id;
33}
34
36static String
38{
39 id.append("__readers");
40 return id;
41}
42
43/* QueueReader */
44
46
47Ipc::QueueReader::QueueReader(): popBlocked(false), popSignal(false),
48 rateLimit(0), balance(0)
49{
50 debugs(54, 7, "constructed " << id);
51}
52
53/* QueueReaders */
54
55Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity),
56 theReaders(theCapacity)
57{
58 Must(theCapacity > 0);
59}
60
61size_t
63{
64 return SharedMemorySize(theCapacity);
65}
66
67size_t
69{
70 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
71}
72
73// OneToOneUniQueue
74
75Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
76 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
77 theCapacity(aCapacity)
78{
80 Must(theCapacity > 0);
81}
82
83int
84Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
85{
86 assert(maxItemSize > 0);
87 size -= sizeof(OneToOneUniQueue);
88 return size >= 0 ? size / maxItemSize : 0;
89}
90
91int
92Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
93{
94 assert(size >= 0);
95 return sizeof(OneToOneUniQueue) + maxItemSize * size;
96}
97
101void
102Ipc::OneToOneUniQueue::statOpen(std::ostream &os, const char *inLabel, const char *outLabel, const uint32_t count) const
103{
104 os << "{ size: " << count <<
105 ", capacity: " << theCapacity <<
106 ", " << inLabel << ": " << theIn <<
107 ", " << outLabel << ": " << theOut;
108}
109
111void
112Ipc::OneToOneUniQueue::statClose(std::ostream &os) const
113{
114 os << "}\n";
115}
116
117/* OneToOneUniQueues */
118
119Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
120{
121 Must(theCapacity > 0);
122 for (int i = 0; i < theCapacity; ++i)
123 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
124}
125
126size_t
128{
129 return sizeof(*this) + theCapacity * front().sharedMemorySize();
130}
131
132size_t
133Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
134{
135 const int queueSize =
136 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
137 return sizeof(OneToOneUniQueues) + queueSize * capacity;
138}
139
142{
143 Must(0 <= index && index < theCapacity);
144 const size_t queueSize = index ? front().sharedMemorySize() : 0;
145 const char *const queue =
146 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
147 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
148}
149
150// BaseMultiQueue
151
152Ipc::BaseMultiQueue::BaseMultiQueue(const int aLocalProcessId):
153 theLocalProcessId(aLocalProcessId),
154 theLastPopProcessId(std::numeric_limits<int>::max() - 1)
155{
156}
157
158void
159Ipc::BaseMultiQueue::clearReaderSignal(const int /*remoteProcessId*/)
160{
161 // Unused remoteProcessId may be useful for at least two optimizations:
162 // * TODO: After QueueReader::popSignal is moved to each OneToOneUniQueue,
163 // we could clear just the remoteProcessId popSignal, further reducing the
164 // number of UDS notifications writers have to send.
165 // * We could adjust theLastPopProcessId to try popping from the
166 // remoteProcessId queue first. That does not seem to help much and might
167 // introduce some bias, so we do not do that for now.
168 clearAllReaderSignals();
169}
170
171void
173{
174 QueueReader &reader = localReader();
175 debugs(54, 7, "reader: " << reader.id);
176 reader.clearSignal();
177}
178
180Ipc::BaseMultiQueue::balance(const int remoteProcessId) const
181{
182 const QueueReader &r = remoteReader(remoteProcessId);
183 return r.balance;
184}
185
187Ipc::BaseMultiQueue::rateLimit(const int remoteProcessId) const
188{
189 const QueueReader &r = remoteReader(remoteProcessId);
190 return r.rateLimit;
191}
192
194Ipc::BaseMultiQueue::inQueue(const int remoteProcessId)
195{
196 const OneToOneUniQueue &queue =
197 const_cast<const BaseMultiQueue *>(this)->inQueue(remoteProcessId);
198 return const_cast<OneToOneUniQueue &>(queue);
199}
200
202Ipc::BaseMultiQueue::outQueue(const int remoteProcessId)
203{
204 const OneToOneUniQueue &queue =
205 const_cast<const BaseMultiQueue *>(this)->outQueue(remoteProcessId);
206 return const_cast<OneToOneUniQueue &>(queue);
207}
208
211{
212 const QueueReader &reader =
213 const_cast<const BaseMultiQueue *>(this)->localReader();
214 return const_cast<QueueReader &>(reader);
215}
216
218Ipc::BaseMultiQueue::remoteReader(const int remoteProcessId)
219{
220 const QueueReader &reader =
221 const_cast<const BaseMultiQueue *>(this)->remoteReader(remoteProcessId);
222 return const_cast<QueueReader &>(reader);
223}
224
225// FewToFewBiQueue
226
228Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
229{
230 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
231}
232
233Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
234 BaseMultiQueue(aLocalProcessId),
235 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
236 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
237 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
238 theLocalGroup(aLocalGroup)
239{
240 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
241 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
242
243 debugs(54, 7, "queue " << id << " reader: " << localReader().id);
244}
245
246int
247Ipc::FewToFewBiQueue::MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
248{
249 return capacity * groupASize * groupBSize * 2;
250}
251
252bool
253Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
254{
255 switch (group) {
256 case groupA:
257 return metadata->theGroupAIdOffset <= processId &&
258 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
259 case groupB:
260 return metadata->theGroupBIdOffset <= processId &&
261 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
262 }
263 return false;
264}
265
266int
267Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
268{
269 Must(fromGroup != toGroup);
270 assert(validProcessId(fromGroup, fromProcessId));
271 assert(validProcessId(toGroup, toProcessId));
272 int index1;
273 int index2;
274 int offset;
275 if (fromGroup == groupA) {
276 index1 = fromProcessId - metadata->theGroupAIdOffset;
277 index2 = toProcessId - metadata->theGroupBIdOffset;
278 offset = 0;
279 } else {
280 index1 = toProcessId - metadata->theGroupAIdOffset;
281 index2 = fromProcessId - metadata->theGroupBIdOffset;
282 offset = metadata->theGroupASize * metadata->theGroupBSize;
283 }
284 const int index = offset + index1 * metadata->theGroupBSize + index2;
285 return index;
286}
287
289Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
290{
291 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
292}
293
295Ipc::FewToFewBiQueue::inQueue(const int remoteProcessId) const
296{
297 return oneToOneQueue(remoteGroup(), remoteProcessId,
298 theLocalGroup, theLocalProcessId);
299}
300
302Ipc::FewToFewBiQueue::outQueue(const int remoteProcessId) const
303{
304 return oneToOneQueue(theLocalGroup, theLocalProcessId,
305 remoteGroup(), remoteProcessId);
306}
307
308int
309Ipc::FewToFewBiQueue::readerIndex(const Group group, const int processId) const
310{
311 Must(validProcessId(group, processId));
312 return group == groupA ?
313 processId - metadata->theGroupAIdOffset :
314 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
315}
316
317const Ipc::QueueReader &
319{
320 return readers->theReaders[readerIndex(theLocalGroup, theLocalProcessId)];
321}
322
323const Ipc::QueueReader &
324Ipc::FewToFewBiQueue::remoteReader(const int processId) const
325{
326 return readers->theReaders[readerIndex(remoteGroup(), processId)];
327}
328
329int
331{
332 return theLocalGroup == groupA ? metadata->theGroupBSize :
333 metadata->theGroupASize;
334}
335
336int
338{
339 return theLocalGroup == groupA ? metadata->theGroupBIdOffset :
340 metadata->theGroupAIdOffset;
341}
342
343Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
344 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
345 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
346{
347 Must(theGroupASize > 0);
348 Must(theGroupBSize > 0);
349}
350
351Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
352 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
353 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
354 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
355{
356}
357
359{
360 delete metadataOwner;
361 delete queuesOwner;
362 delete readersOwner;
363}
364
365// MultiQueue
366
368Ipc::MultiQueue::Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
369{
370 return new Owner(id, processCount, processIdOffset, maxItemSize, capacity);
371}
372
373Ipc::MultiQueue::MultiQueue(const String &id, const int localProcessId):
374 BaseMultiQueue(localProcessId),
375 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
376 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
377 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf()))
378{
379 Must(queues->theCapacity == metadata->theProcessCount * metadata->theProcessCount);
380 Must(readers->theCapacity == metadata->theProcessCount);
381
382 debugs(54, 7, "queue " << id << " reader: " << localReader().id);
383}
384
385bool
386Ipc::MultiQueue::validProcessId(const int processId) const
387{
388 return metadata->theProcessIdOffset <= processId &&
389 processId < metadata->theProcessIdOffset + metadata->theProcessCount;
390}
391
393Ipc::MultiQueue::oneToOneQueue(const int fromProcessId, const int toProcessId) const
394{
395 assert(validProcessId(fromProcessId));
396 assert(validProcessId(toProcessId));
397 const int fromIndex = fromProcessId - metadata->theProcessIdOffset;
398 const int toIndex = toProcessId - metadata->theProcessIdOffset;
399 const int index = fromIndex * metadata->theProcessCount + toIndex;
400 return (*queues)[index];
401}
402
403const Ipc::QueueReader &
404Ipc::MultiQueue::reader(const int processId) const
405{
406 assert(validProcessId(processId));
407 const int index = processId - metadata->theProcessIdOffset;
408 return readers->theReaders[index];
409}
410
412Ipc::MultiQueue::inQueue(const int remoteProcessId) const
413{
414 return oneToOneQueue(remoteProcessId, theLocalProcessId);
415}
416
418Ipc::MultiQueue::outQueue(const int remoteProcessId) const
419{
420 return oneToOneQueue(theLocalProcessId, remoteProcessId);
421}
422
423const Ipc::QueueReader &
425{
426 return reader(theLocalProcessId);
427}
428
429const Ipc::QueueReader &
430Ipc::MultiQueue::remoteReader(const int processId) const
431{
432 return reader(processId);
433}
434
435int
437{
438 return metadata->theProcessCount;
439}
440
441int
443{
444 return metadata->theProcessIdOffset;
445}
446
447Ipc::MultiQueue::Metadata::Metadata(const int aProcessCount, const int aProcessIdOffset):
448 theProcessCount(aProcessCount), theProcessIdOffset(aProcessIdOffset)
449{
451}
452
453Ipc::MultiQueue::Owner::Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity):
454 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), processCount, processIdOffset)),
455 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), processCount*processCount, maxItemSize, capacity)),
456 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), processCount))
457{
458}
459
461{
462 delete metadataOwner;
463 delete queuesOwner;
464 delete readersOwner;
465}
466
int size
Definition: ModDevPoll.cc:75
#define shm_new(Class)
Definition: Pointer.h:200
#define shm_old(Class)
Definition: Pointer.h:201
static String MetadataId(String id)
constructs Metadata ID from parent queue ID
Definition: Queue.cc:21
InstanceIdDefinitions(Ipc::QueueReader, "ipcQR")
static String QueuesId(String id)
constructs one-to-one queues ID from parent queue ID
Definition: Queue.cc:29
static String ReadersId(String id)
constructs QueueReaders ID from parent queue ID
Definition: Queue.cc:37
#define Must(condition)
Definition: TextException.h:75
#define assert(EX)
Definition: assert.h:17
const QueueReader::Rate & rateLimit(const int remoteProcessId) const
returns reader's rate limit for a given remote process
Definition: Queue.cc:187
void clearAllReaderSignals()
clears all reader notifications received by the local process
Definition: Queue.cc:172
virtual const OneToOneUniQueue & inQueue(const int remoteProcessId) const =0
incoming queue from a given remote process
virtual const QueueReader & localReader() const =0
virtual const QueueReader & remoteReader(const int remoteProcessId) const =0
void clearReaderSignal(const int remoteProcessId)
clears the reader notification received by the local process from the remote process
Definition: Queue.cc:159
BaseMultiQueue(const int aLocalProcessId)
Definition: Queue.cc:152
const QueueReader::Balance & balance(const int remoteProcessId) const
returns reader's balance for a given remote process
Definition: Queue.cc:180
virtual const OneToOneUniQueue & outQueue(const int remoteProcessId) const =0
outgoing queue to a given remote process
Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:351
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:303
int oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:267
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:304
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
Definition: Queue.cc:302
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:305
const OneToOneUniQueue & oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
Definition: Queue.cc:289
const QueueReader & remoteReader(const int processId) const override
Definition: Queue.cc:324
const QueueReader & localReader() const override
Definition: Queue.cc:318
static int MaxItemsCount(const int groupASize, const int groupBSize, const int capacity)
maximum number of items in the queue
Definition: Queue.cc:247
int readerIndex(const Group group, const int processId) const
Definition: Queue.cc:309
bool validProcessId(const Group group, const int processId) const
Definition: Queue.cc:253
static Owner * Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:228
int remotesCount() const override
Definition: Queue.cc:330
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
Definition: Queue.cc:295
int remotesIdOffset() const override
Definition: Queue.cc:337
FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId)
Definition: Queue.cc:233
Owner(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:453
int remotesCount() const override
Definition: Queue.cc:436
const QueueReader & reader(const int processId) const
Definition: Queue.cc:404
static Owner * Init(const String &id, const int processCount, const int processIdOffset, const unsigned int maxItemSize, const int capacity)
Definition: Queue.cc:368
bool validProcessId(const int processId) const
Definition: Queue.cc:386
const QueueReader & localReader() const override
Definition: Queue.cc:424
const Mem::Pointer< Metadata > metadata
shared metadata
Definition: Queue.h:364
const OneToOneUniQueue & outQueue(const int remoteProcessId) const override
outgoing queue to a given remote process
Definition: Queue.cc:418
MultiQueue(const String &id, const int localProcessId)
Definition: Queue.cc:373
const QueueReader & remoteReader(const int remoteProcessId) const override
Definition: Queue.cc:430
const OneToOneUniQueue & oneToOneQueue(const int fromProcessId, const int toProcessId) const
Definition: Queue.cc:393
const OneToOneUniQueue & inQueue(const int remoteProcessId) const override
incoming queue from a given remote process
Definition: Queue.cc:412
const Mem::Pointer< QueueReaders > readers
readers array
Definition: Queue.h:366
const Mem::Pointer< OneToOneUniQueues > queues
unidirection one-to-one queues
Definition: Queue.h:365
int remotesIdOffset() const override
Definition: Queue.cc:442
const uint32_t theCapacity
maximum number of items, i.e. theBuffer size
Definition: Queue.h:139
static int Items2Bytes(const unsigned int maxItemSize, const int size)
Definition: Queue.cc:92
const unsigned int theMaxItemSize
maximum item size
Definition: Queue.h:138
void statClose(std::ostream &) const
end state reporting started by statOpen()
Definition: Queue.cc:112
OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity)
Definition: Queue.cc:75
static int Bytes2Items(const unsigned int maxItemSize, int size)
Definition: Queue.cc:84
void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const
Definition: Queue.cc:102
shared array of OneToOneUniQueues
Definition: Queue.h:146
OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:119
const OneToOneUniQueue & operator[](const int index) const
Definition: Queue.cc:141
static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
Definition: Queue.cc:133
const int theCapacity
Definition: Queue.h:160
size_t sharedMemorySize() const
Definition: Queue.cc:127
const InstanceId< QueueReader > id
unique ID for debugging which reader is used (works across processes)
Definition: Queue.h:67
Balance balance
how far ahead the reader is compared to a perfect read/sec event rate
Definition: Queue.h:64
void clearSignal()
marks sent reader notification as received (also removes pop blocking)
Definition: Queue.h:50
Rate rateLimit
pop()s per second limit if positive
Definition: Queue.h:58
std::atomic< int > Rate
pop()s per second
Definition: Queue.h:57
AtomicSignedMsec Balance
Definition: Queue.h:62
shared array of QueueReaders
Definition: Queue.h:72
const int theCapacity
Definition: Queue.h:78
QueueReaders(const int aCapacity)
Definition: Queue.cc:55
static size_t SharedMemorySize(const int capacity)
Definition: Queue.cc:68
size_t sharedMemorySize() const
Definition: Queue.cc:62
void append(char const *buf, int len)
Definition: String.cc:130
A const & max(A const &lhs, A const &rhs)
#define debugs(SECTION, LEVEL, CONTENT)
Definition: Stream.h:194
STL namespace.
Shared metadata for FewToFewBiQueue.
Definition: Queue.h:250
Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset)
Definition: Queue.cc:343
Shared metadata for MultiQueue.
Definition: Queue.h:324
Metadata(const int aProcessCount, const int aProcessIdOffset)
Definition: Queue.cc:447
const int theProcessCount
Definition: Queue.h:329
int unsigned int
Definition: stub_fd.cc:19

 

Introduction

Documentation

Support

Miscellaneous

Web Site Translations

Mirrors