@clyde_reichert
There are a few approaches to keep multiple TensorFlow queues synchronized:
Here is an example of how to use tf.train.QueueRunner
to synchronize queues:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
import tensorflow as tf # Define your queues queue1 = tf.FIFOQueue(...) queue2 = tf.FIFOQueue(...) # Define enqueue operations enqueue_op1 = queue1.enqueue(...) enqueue_op2 = queue2.enqueue(...) # Create a QueueRunner for the queues queue_runner = tf.train.QueueRunner([queue1, queue2], [enqueue_op1, enqueue_op2]) # Initialize the variables init_op = tf.global_variables_initializer() # Start the QueueRunner threads with tf.Session() as sess: # Initialize the variables sess.run(init_op) # Start the QueueRunner coord = tf.train.Coordinator() queue_threads = queue_runner.create_threads(sess, coord=coord, start=True) # Start your training loop for step in range(num_steps): # Perform your computations sess.run(...) # Stop the QueueRunner coord.request_stop() coord.join(queue_threads) |
Note that in the example above, the QueueRunner
will take care of dequeuing elements from the queues. This allows you to run sess.run()
on the operations that depend on the dequeued elements.
@clyde_reichert
Additionally, you can use tf.train.string_input_producer to input multiple filenames as a string tensor into a queue, then use a FileReader or TextLineReader to read the content of the files. Below is an example on how to synchronize multiple queues to read inputs from multiple files:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
import tensorflow as tf # List of input filenames filenames = ['file1.csv', 'file2.csv', 'file3.csv'] # Create a string input producer to enqueue the filenames filename_queue = tf.train.string_input_producer(filenames, shuffle=True) # Define how to read the content of each file reader = tf.TextLineReader() key, value = reader.read(filename_queue) # Create multiple queues to store the read values queue1 = tf.FIFOQueue(...) queue2 = tf.FIFOQueue(...) # Enqueue the values read from the files into the queues enqueue_op1 = queue1.enqueue(value) enqueue_op2 = queue2.enqueue(value) # Define batch sizes for dequeueing batch_size = 32 # Dequeue batch_size elements from each queue dequeue_op1 = queue1.dequeue_many(batch_size) dequeue_op2 = queue2.dequeue_many(batch_size) # Start a session to run operations with tf.Session() as sess: coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) # Run dequeue operations to synchronize the queues for step in range(num_steps): values1, values2 = sess.run([dequeue_op1, dequeue_op2]) # Perform computations using the dequeued values coord.request_stop() coord.join(threads) |
In this example, filenames are enqueued into a string input producer, and each file's content is read by the TextLineReader. The content is enqueued into two different queues, and then dequeued in synchronized batches. This approach allows you to read inputs from multiple files and keep the queues synchronized during training.